From 19a5a384843e136cb531a2cdb919485aa9b66a16 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Thu, 21 Nov 2024 20:46:28 +0000 Subject: [PATCH 1/9] feat: asynchronous trie prefetching --- core/state/statedb.go | 4 +- core/state/trie_prefetcher.go | 21 ++++-- core/state/trie_prefetcher.libevm.go | 108 +++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 core/state/trie_prefetcher.libevm.go diff --git a/core/state/statedb.go b/core/state/statedb.go index 3b706002e76..d641fb3b042 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -175,13 +175,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string) { +func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 45fac913dd0..b56d593c2f5 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/libevm/options" "github.com/ava-labs/libevm/log" "github.com/ava-labs/libevm/metrics" ) @@ -49,9 +50,11 @@ type triePrefetcher struct { storageDupMeter metrics.Meter storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter + + options []PrefetcherOption } -func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { +func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ db: db, @@ -67,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + + options: opts, } return p } @@ -122,6 +127,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { storageDupMeter: p.storageDupMeter, storageSkipMeter: p.storageSkipMeter, storageWasteMeter: p.storageWasteMeter, + + options: p.options, } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { @@ -150,7 +157,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p.db, p.root, owner, root, addr) + fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...) p.fetchers[id] = fetcher } fetcher.schedule(keys) @@ -226,11 +233,13 @@ type subfetcher struct { seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end + + pool *subfetcherPool } // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher { sf := &subfetcher{ db: db, state: state, @@ -243,6 +252,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo copy: make(chan chan Trie), seen: make(map[string]struct{}), } + options.As[prefetcherConfig](opts...).apply(sf) go sf.loop() return sf } @@ -293,6 +303,7 @@ func (sf *subfetcher) abort() { // loop waits for new tasks to be scheduled and keeps loading them until it runs // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { + defer sf.wait() // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) @@ -344,9 +355,9 @@ func (sf *subfetcher) loop() { sf.dups++ } else { if len(task) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task)) + sf.GetAccount(common.BytesToAddress(task)) } else { - sf.trie.GetStorage(sf.addr, task) + sf.GetStorage(sf.addr, task) } sf.seen[string(task)] = struct{}{} } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go new file mode 100644 index 00000000000..e62844db747 --- /dev/null +++ b/core/state/trie_prefetcher.libevm.go @@ -0,0 +1,108 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package state + +import ( + "sync" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/libevm/options" + "github.com/ava-labs/libevm/log" +) + +// A PrefetcherOption configures behaviour of trie prefetching. +type PrefetcherOption = options.Option[prefetcherConfig] + +type prefetcherConfig struct { + newWorkers func() WorkerPool +} + +// A WorkerPool is responsible for executing functions, possibly asynchronously. +type WorkerPool interface { + Execute(func()) + Wait() +} + +// WithWorkerPools configures trie prefetching to execute asynchronously. The +// provided constructor is called once for each trie being fetched and it MAY +// return the same pool. +func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { + return options.Func[prefetcherConfig](func(c *prefetcherConfig) { + c.newWorkers = ctor + }) +} + +type subfetcherPool struct { + workers WorkerPool + tries sync.Pool +} + +// apply configures the [subfetcher] to use a [WorkerPool] if one was provided +// with a [PrefetcherOption]. +func (c *prefetcherConfig) apply(sf *subfetcher) { + sf.pool = &subfetcherPool{ + tries: sync.Pool{ + // Although the workers may be shared between all subfetchers, each + // MUST have its own Trie pool. + New: func() any { + return sf.db.CopyTrie(sf.trie) + }, + }, + } + if c.newWorkers != nil { + sf.pool.workers = c.newWorkers() + } +} + +func (sf *subfetcher) wait() { + if w := sf.pool.workers; w != nil { + w.Wait() + } +} + +// execute runs the provided function with a copy of the subfetcher's Trie. +// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was +// configured with a [WorkerPool] then it is used for function execution, +// otherwise `fn` is just called directly. +func (sf *subfetcher) execute(fn func(Trie)) { + trie := sf.pool.tries.Get().(Trie) + if w := sf.pool.workers; w != nil { + w.Execute(func() { fn(trie) }) + } else { + fn(trie) + } + sf.pool.tries.Put(trie) +} + +// GetAccount optimistically pre-fetches an account, dropping the returned value +// and logging errors. See [subfetcher.execute] re worker pools. +func (sf *subfetcher) GetAccount(addr common.Address) { + sf.execute(func(t Trie) { + if _, err := t.GetAccount(addr); err != nil { + log.Error("account prefetching failed", "address", addr, "err", err) + } + }) +} + +// GetStorage is the storage equivalent of [subfetcher.GetAccount]. +func (sf *subfetcher) GetStorage(addr common.Address, key []byte) { + sf.execute(func(t Trie) { + if _, err := t.GetStorage(addr, key); err != nil { + log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) + } + }) +} From e1707fe58f4f3a473fbeb6df37c0660ea46c7f2b Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Thu, 21 Nov 2024 20:53:46 +0000 Subject: [PATCH 2/9] feat: exhaust prefetcher tasks before termination --- core/state/trie_prefetcher.go | 26 ++++++++++++++++++++++---- core/state/trie_prefetcher.libevm.go | 11 ++++++----- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index b56d593c2f5..63b49bddb5f 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -252,7 +252,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo copy: make(chan chan Trie), seen: make(map[string]struct{}), } - options.As[prefetcherConfig](opts...).apply(sf) + options.As[prefetcherConfig](opts...).applyTo(sf) go sf.loop() return sf } @@ -303,7 +303,7 @@ func (sf *subfetcher) abort() { // loop waits for new tasks to be scheduled and keeps loading them until it runs // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { - defer sf.wait() + defer sf.pool.wait() // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) @@ -369,8 +369,26 @@ func (sf *subfetcher) loop() { ch <- sf.db.CopyTrie(sf.trie) case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return + //libevm:start + // + // This is copied, with alteration, from ethereum/go-ethereum#29519 + // and can be deleted once we update to include that change. + + // Termination is requested, abort if no more tasks are pending. If + // there are some, exhaust them first. + sf.lock.Lock() + done := len(sf.tasks) == 0 + sf.lock.Unlock() + + if done { + return + } + + select { + case sf.wake <- struct{}{}: + default: + } + //libevm:end } } } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index e62844db747..748a88507e6 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -51,9 +51,9 @@ type subfetcherPool struct { tries sync.Pool } -// apply configures the [subfetcher] to use a [WorkerPool] if one was provided +// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided // with a [PrefetcherOption]. -func (c *prefetcherConfig) apply(sf *subfetcher) { +func (c *prefetcherConfig) applyTo(sf *subfetcher) { sf.pool = &subfetcherPool{ tries: sync.Pool{ // Although the workers may be shared between all subfetchers, each @@ -68,10 +68,11 @@ func (c *prefetcherConfig) apply(sf *subfetcher) { } } -func (sf *subfetcher) wait() { - if w := sf.pool.workers; w != nil { - w.Wait() +func (p *subfetcherPool) wait() { + if p == nil || p.workers == nil { + return } + p.workers.Wait() } // execute runs the provided function with a copy of the subfetcher's Trie. From 02d59d9491d635badd7bbe026aafde08f42f0dca Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Fri, 22 Nov 2024 10:00:00 +0000 Subject: [PATCH 3/9] refactor: move new methods onto `subfetcherPool` --- core/state/trie_prefetcher.go | 4 ++-- core/state/trie_prefetcher.libevm.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 63b49bddb5f..06daa4f8adb 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -355,9 +355,9 @@ func (sf *subfetcher) loop() { sf.dups++ } else { if len(task) == common.AddressLength { - sf.GetAccount(common.BytesToAddress(task)) + sf.pool.GetAccount(common.BytesToAddress(task)) } else { - sf.GetStorage(sf.addr, task) + sf.pool.GetStorage(sf.addr, task) } sf.seen[string(task)] = struct{}{} } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 748a88507e6..e18e00c2f2b 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -79,20 +79,20 @@ func (p *subfetcherPool) wait() { // Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. -func (sf *subfetcher) execute(fn func(Trie)) { - trie := sf.pool.tries.Get().(Trie) - if w := sf.pool.workers; w != nil { +func (p *subfetcherPool) execute(fn func(Trie)) { + trie := p.tries.Get().(Trie) + if w := p.workers; w != nil { w.Execute(func() { fn(trie) }) } else { fn(trie) } - sf.pool.tries.Put(trie) + p.tries.Put(trie) } // GetAccount optimistically pre-fetches an account, dropping the returned value // and logging errors. See [subfetcher.execute] re worker pools. -func (sf *subfetcher) GetAccount(addr common.Address) { - sf.execute(func(t Trie) { +func (p *subfetcherPool) GetAccount(addr common.Address) { + p.execute(func(t Trie) { if _, err := t.GetAccount(addr); err != nil { log.Error("account prefetching failed", "address", addr, "err", err) } @@ -100,8 +100,8 @@ func (sf *subfetcher) GetAccount(addr common.Address) { } // GetStorage is the storage equivalent of [subfetcher.GetAccount]. -func (sf *subfetcher) GetStorage(addr common.Address, key []byte) { - sf.execute(func(t Trie) { +func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) { + p.execute(func(t Trie) { if _, err := t.GetStorage(addr, key); err != nil { log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) } From ab480f09f4f3b4b0166409ec0c896a22be6b8ee0 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Fri, 22 Nov 2024 16:24:50 +0000 Subject: [PATCH 4/9] fix: `Pool.Put()` trie copy _inside_ concurrent function --- core/state/trie_prefetcher.libevm.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index e18e00c2f2b..553bd15551b 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -31,14 +31,14 @@ type prefetcherConfig struct { newWorkers func() WorkerPool } -// A WorkerPool is responsible for executing functions, possibly asynchronously. +// A WorkerPool executes functions asynchronously. type WorkerPool interface { Execute(func()) Wait() } // WithWorkerPools configures trie prefetching to execute asynchronously. The -// provided constructor is called once for each trie being fetched and it MAY +// provided constructor is called once for each trie being fetched but it MAY // return the same pool. func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { return options.Func[prefetcherConfig](func(c *prefetcherConfig) { @@ -76,21 +76,24 @@ func (p *subfetcherPool) wait() { } // execute runs the provided function with a copy of the subfetcher's Trie. -// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was +// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. func (p *subfetcherPool) execute(fn func(Trie)) { - trie := p.tries.Get().(Trie) + do := func() { + t := p.tries.Get().(Trie) + fn(t) + p.tries.Put(t) + } if w := p.workers; w != nil { - w.Execute(func() { fn(trie) }) + w.Execute(do) } else { - fn(trie) + do() } - p.tries.Put(trie) } // GetAccount optimistically pre-fetches an account, dropping the returned value -// and logging errors. See [subfetcher.execute] re worker pools. +// and logging errors. See [subfetcherPool.execute] re worker pools. func (p *subfetcherPool) GetAccount(addr common.Address) { p.execute(func(t Trie) { if _, err := t.GetAccount(addr); err != nil { @@ -99,7 +102,7 @@ func (p *subfetcherPool) GetAccount(addr common.Address) { }) } -// GetStorage is the storage equivalent of [subfetcher.GetAccount]. +// GetStorage is the storage equivalent of [subfetcherPool.GetAccount]. func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) { p.execute(func(t Trie) { if _, err := t.GetStorage(addr, key); err != nil { From 12e1a9cc49c571cc22e4efe49aa87b5cb3cb01de Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Fri, 22 Nov 2024 18:30:40 +0000 Subject: [PATCH 5/9] test: `StateDB.StopPrefetcher()` blocks on `WorkerPool.Wait()` --- core/state/trie_prefetcher.go | 7 ++- core/state/trie_prefetcher.libevm.go | 15 +++++ core/state/trie_prefetcher.libevm_test.go | 74 +++++++++++++++++++++++ 3 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 core/state/trie_prefetcher.libevm_test.go diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 06daa4f8adb..6ecbcc004e6 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -79,6 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ... // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { + p.abortFetchersConcurrently() for _, fetcher := range p.fetchers { fetcher.abort() // safe to do multiple times @@ -303,9 +304,11 @@ func (sf *subfetcher) abort() { // loop waits for new tasks to be scheduled and keeps loading them until it runs // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { - defer sf.pool.wait() // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) + defer func() { + sf.pool.wait() + close(sf.term) + }() // Start by opening the trie and stop processing if it fails if sf.owner == (common.Hash{}) { diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 553bd15551b..9115a7ebcfa 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -68,6 +68,21 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) { } } +// abortFetchersConcurrently calls [subfetcher.abort] on every fetcher, blocking +// until all return. Calling abort() sequentially may result in later fetchers +// accepting new work in the interim. +func (p *triePrefetcher) abortFetchersConcurrently() { + var wg sync.WaitGroup + for _, f := range p.fetchers { + wg.Add(1) + go func(f *subfetcher) { + f.abort() + wg.Done() + }(f) + } + wg.Wait() +} + func (p *subfetcherPool) wait() { if p == nil || p.workers == nil { return diff --git a/core/state/trie_prefetcher.libevm_test.go b/core/state/trie_prefetcher.libevm_test.go new file mode 100644 index 00000000000..bb51dd26e16 --- /dev/null +++ b/core/state/trie_prefetcher.libevm_test.go @@ -0,0 +1,74 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package state + +import ( + "testing" + "time" + + "github.com/ava-labs/libevm/common" +) + +type synchronisingWorkerPool struct { + executed, unblock chan struct{} +} + +var _ WorkerPool = (*synchronisingWorkerPool)(nil) + +func (p *synchronisingWorkerPool) Execute(func()) { + select { + case <-p.executed: + default: + close(p.executed) + } +} + +func (p *synchronisingWorkerPool) Wait() { + <-p.unblock +} + +func TestStopPrefetcherWaitsOnWorkers(t *testing.T) { + pool := &synchronisingWorkerPool{ + executed: make(chan struct{}), + unblock: make(chan struct{}), + } + opt := WithWorkerPools(func() WorkerPool { return pool }) + + db := filledStateDB() + db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt) + db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}}) + + go func() { + <-pool.executed + // Sleep otherwise there is a small chance that we close pool.unblock + // between db.StopPrefetcher() returning and the select receiving on the + // channel. + time.Sleep(time.Second) + close(pool.unblock) + }() + + <-pool.executed + db.StopPrefetcher() + select { + case <-pool.unblock: + // The channel was closed, therefore pool.Wait() unblocked. This is a + // necessary pre-condition for db.StopPrefetcher() unblocking, and the + // purpose of this test. + default: + t.Errorf("%T.StopPrefetcher() returned before %T.Wait() unblocked", db, pool) + } +} From e4716bc126acc01d756f3d31c9dcad72fd741576 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Fri, 22 Nov 2024 20:26:36 +0000 Subject: [PATCH 6/9] refactor: `subfetcher`s track outstanding tasks --- core/state/trie_prefetcher.go | 2 +- core/state/trie_prefetcher.libevm.go | 31 ++++++++++++++------- core/state/trie_prefetcher.libevm_test.go | 33 +++++++++++++---------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 6ecbcc004e6..1307e972bb5 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -79,7 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ... // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { - p.abortFetchersConcurrently() + p.abortFetchersAndReleaseWorkerPools() for _, fetcher := range p.fetchers { fetcher.abort() // safe to do multiple times diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 9115a7ebcfa..1371d1dd8bf 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -31,10 +31,12 @@ type prefetcherConfig struct { newWorkers func() WorkerPool } -// A WorkerPool executes functions asynchronously. +// A WorkerPool executes functions asynchronously. Done() is called to signal +// that the pool is no longer needed and that Execute() is guaranteed to not be +// called again. type WorkerPool interface { Execute(func()) - Wait() + Done() } // WithWorkerPools configures trie prefetching to execute asynchronously. The @@ -49,6 +51,7 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { type subfetcherPool struct { workers WorkerPool tries sync.Pool + wg sync.WaitGroup } // applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided @@ -68,10 +71,9 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) { } } -// abortFetchersConcurrently calls [subfetcher.abort] on every fetcher, blocking -// until all return. Calling abort() sequentially may result in later fetchers -// accepting new work in the interim. -func (p *triePrefetcher) abortFetchersConcurrently() { +func (p *triePrefetcher) abortFetchersAndReleaseWorkerPools() { + // Calling abort() sequentially may result in later fetchers accepting new + // work in the interim. var wg sync.WaitGroup for _, f := range p.fetchers { wg.Add(1) @@ -80,14 +82,20 @@ func (p *triePrefetcher) abortFetchersConcurrently() { wg.Done() }(f) } + + // A WorkerPool is allowed to be shared between fetchers so we MUST wait for + // them to finish all tasks otherwise they could call Execute() after + // Done(), which we guarantee in the public API to be impossible. wg.Wait() + for _, f := range p.fetchers { + if w := f.pool.workers; w != nil { + w.Done() + } + } } func (p *subfetcherPool) wait() { - if p == nil || p.workers == nil { - return - } - p.workers.Wait() + p.wg.Wait() } // execute runs the provided function with a copy of the subfetcher's Trie. @@ -95,11 +103,14 @@ func (p *subfetcherPool) wait() { // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. func (p *subfetcherPool) execute(fn func(Trie)) { + p.wg.Add(1) do := func() { t := p.tries.Get().(Trie) fn(t) p.tries.Put(t) + p.wg.Done() } + if w := p.workers; w != nil { w.Execute(do) } else { diff --git a/core/state/trie_prefetcher.libevm_test.go b/core/state/trie_prefetcher.libevm_test.go index bb51dd26e16..a531eacdbba 100644 --- a/core/state/trie_prefetcher.libevm_test.go +++ b/core/state/trie_prefetcher.libevm_test.go @@ -21,28 +21,39 @@ import ( "time" "github.com/ava-labs/libevm/common" + "github.com/stretchr/testify/assert" ) type synchronisingWorkerPool struct { - executed, unblock chan struct{} + t *testing.T + executed, unblock chan struct{} + done bool + preconditionsToStopPrefetcher int } var _ WorkerPool = (*synchronisingWorkerPool)(nil) -func (p *synchronisingWorkerPool) Execute(func()) { +func (p *synchronisingWorkerPool) Execute(fn func()) { + fn() select { case <-p.executed: default: close(p.executed) } -} -func (p *synchronisingWorkerPool) Wait() { <-p.unblock + assert.False(p.t, p.done, "Done() called before Execute() returns") + p.preconditionsToStopPrefetcher++ +} + +func (p *synchronisingWorkerPool) Done() { + p.done = true + p.preconditionsToStopPrefetcher++ } func TestStopPrefetcherWaitsOnWorkers(t *testing.T) { pool := &synchronisingWorkerPool{ + t: t, executed: make(chan struct{}), unblock: make(chan struct{}), } @@ -55,20 +66,14 @@ func TestStopPrefetcherWaitsOnWorkers(t *testing.T) { go func() { <-pool.executed // Sleep otherwise there is a small chance that we close pool.unblock - // between db.StopPrefetcher() returning and the select receiving on the - // channel. + // between db.StopPrefetcher() returning and the assertion. time.Sleep(time.Second) close(pool.unblock) }() <-pool.executed db.StopPrefetcher() - select { - case <-pool.unblock: - // The channel was closed, therefore pool.Wait() unblocked. This is a - // necessary pre-condition for db.StopPrefetcher() unblocking, and the - // purpose of this test. - default: - t.Errorf("%T.StopPrefetcher() returned before %T.Wait() unblocked", db, pool) - } + // If this happens then either Execute() hadn't returned or Done() wasn't + // called. + assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db) } From 23b36e5ac7d1eb3fa6dc05ba06500aba110a2634 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Sat, 23 Nov 2024 16:08:03 +0000 Subject: [PATCH 7/9] refactor: `releaseWorkerPools()` --- core/state/trie_prefetcher.go | 2 +- core/state/trie_prefetcher.libevm.go | 21 +++++---------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 1307e972bb5..e4a12984f75 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -79,7 +79,6 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ... // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { - p.abortFetchersAndReleaseWorkerPools() for _, fetcher := range p.fetchers { fetcher.abort() // safe to do multiple times @@ -105,6 +104,7 @@ func (p *triePrefetcher) close() { } } } + p.releaseWorkerPools() // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 1371d1dd8bf..6ff3216c30f 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -71,22 +71,11 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) { } } -func (p *triePrefetcher) abortFetchersAndReleaseWorkerPools() { - // Calling abort() sequentially may result in later fetchers accepting new - // work in the interim. - var wg sync.WaitGroup - for _, f := range p.fetchers { - wg.Add(1) - go func(f *subfetcher) { - f.abort() - wg.Done() - }(f) - } - - // A WorkerPool is allowed to be shared between fetchers so we MUST wait for - // them to finish all tasks otherwise they could call Execute() after - // Done(), which we guarantee in the public API to be impossible. - wg.Wait() +// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be +// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed +// to be shared between them. This is because we guarantee in the public API +// that no further calls will be made to Execute() after a call to Done(). +func (p *triePrefetcher) releaseWorkerPools() { for _, f := range p.fetchers { if w := f.pool.workers; w != nil { w.Done() From 2553546850ca487c5158694afb82c59036eea5a3 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Sat, 23 Nov 2024 18:47:18 +0000 Subject: [PATCH 8/9] feat: `libevm/sync.Pool[T]` for type safety --- core/state/trie_prefetcher.libevm.go | 11 +++-- core/state/trie_prefetcher.libevm_test.go | 5 ++- libevm/sync/sync.go | 52 +++++++++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 libevm/sync/sync.go diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 6ff3216c30f..abee575be31 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -17,10 +17,9 @@ package state import ( - "sync" - "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/libevm/options" + "github.com/ava-labs/libevm/libevm/sync" "github.com/ava-labs/libevm/log" ) @@ -50,7 +49,7 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { type subfetcherPool struct { workers WorkerPool - tries sync.Pool + tries sync.Pool[Trie] wg sync.WaitGroup } @@ -58,10 +57,10 @@ type subfetcherPool struct { // with a [PrefetcherOption]. func (c *prefetcherConfig) applyTo(sf *subfetcher) { sf.pool = &subfetcherPool{ - tries: sync.Pool{ + tries: sync.Pool[Trie]{ // Although the workers may be shared between all subfetchers, each // MUST have its own Trie pool. - New: func() any { + New: func() Trie { return sf.db.CopyTrie(sf.trie) }, }, @@ -94,7 +93,7 @@ func (p *subfetcherPool) wait() { func (p *subfetcherPool) execute(fn func(Trie)) { p.wg.Add(1) do := func() { - t := p.tries.Get().(Trie) + t := p.tries.Get() fn(t) p.tries.Put(t) p.wg.Done() diff --git a/core/state/trie_prefetcher.libevm_test.go b/core/state/trie_prefetcher.libevm_test.go index a531eacdbba..884bfba5677 100644 --- a/core/state/trie_prefetcher.libevm_test.go +++ b/core/state/trie_prefetcher.libevm_test.go @@ -20,8 +20,9 @@ import ( "testing" "time" - "github.com/ava-labs/libevm/common" "github.com/stretchr/testify/assert" + + "github.com/ava-labs/libevm/common" ) type synchronisingWorkerPool struct { @@ -73,7 +74,7 @@ func TestStopPrefetcherWaitsOnWorkers(t *testing.T) { <-pool.executed db.StopPrefetcher() - // If this happens then either Execute() hadn't returned or Done() wasn't + // If this errors then either Execute() hadn't returned or Done() wasn't // called. assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db) } diff --git a/libevm/sync/sync.go b/libevm/sync/sync.go new file mode 100644 index 00000000000..991a3a875ee --- /dev/null +++ b/libevm/sync/sync.go @@ -0,0 +1,52 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package sync extends the standard library's sync package. +package sync + +import "sync" + +// Aliases of stdlib sync's types to avoid having to import it alongside this +// package. +type ( + Cond = sync.Cond + Locker = sync.Locker + Map = sync.Map + Mutex = sync.Mutex + Once = sync.Once + RWMutex = sync.RWMutex + WaitGroup = sync.WaitGroup +) + +// A Pool is a type-safe wrapper around [sync.Pool]. +type Pool[T any] struct { + New func() T + pool sync.Pool + once Once +} + +// Get is equivalent to [sync.Pool.Get]. +func (p *Pool[T]) Get() T { + p.once.Do(func() { // Do() guarantees at least once, not just only once + p.pool.New = func() any { return p.New() } + }) + return p.pool.Get().(T) //nolint:forcetypeassert +} + +// Put is equivalent to [sync.Pool.Put]. +func (p *Pool[T]) Put(t T) { + p.pool.Put(t) +} From 5eb68df8f2290ea39a1d85a7e10426a5e8c25237 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Tue, 26 Nov 2024 09:48:10 +0000 Subject: [PATCH 9/9] refactor!: remove `<-sf.stop` case in task loop --- core/state/trie_prefetcher.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index e4a12984f75..275f20b94b6 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -339,14 +339,14 @@ func (sf *subfetcher) loop() { sf.lock.Unlock() // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { + for _, task := range tasks { select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() - return + //libevm:start + // + // The <-sf.stop case has been removed, in keeping with the equivalent change below. Future geth + // versions also remove it so our modification here can be undone when merging upstream. + // + //libevm:end case ch := <-sf.copy: // Somebody wants a copy of the current trie, grant them