Skip to content

Commit

Permalink
core/state/snapshot: decouple state generation from disk layer
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Jun 18, 2024
1 parent 7cf6a63 commit 76d4609
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 312 deletions.
53 changes: 34 additions & 19 deletions core/state/snapshot/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ const (
)

// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes.
// for logging purposes. This data structure is used throughout the entire
// lifecycle of the snapshot generation process and is shared across multiple
// generation cycles.
type generatorStats struct {
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
Expand All @@ -46,9 +48,9 @@ type generatorStats struct {
storage common.StorageSize // Total account and storage slot size(generation or recovery)
}

// Log creates a contextual log with the given message and the context pulled
// log creates a contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
func (gs *generatorStats) log(msg string, root common.Hash, marker []byte) {
var ctx []interface{}
if root != (common.Hash{}) {
ctx = append(ctx, []interface{}{"root", root}...)
Expand Down Expand Up @@ -85,29 +87,42 @@ func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
log.Info(msg, ctx...)
}

// generatorContext carries a few global values to be shared by all generation functions.
// generatorContext holds several global fields that are used throughout the
// current generation cycle.
type generatorContext struct {
stats *generatorStats // Generation statistic collection
db ethdb.KeyValueStore // Key-value store containing the snapshot data
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
batch ethdb.Batch // Database batch for writing batch data atomically
logged time.Time // The timestamp when last generation progress was displayed
root common.Hash // State root of the generation target
marker []byte // Generation progress marker
setMarker func(marker []byte) // Function to notify the generation progress
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
db ethdb.KeyValueStore // Key-value store containing the snapshot data
batch ethdb.Batch // Database batch for writing data atomically
logged time.Time // The timestamp when last generation progress was displayed
}

// newGeneratorContext initializes the context for generation.
func newGeneratorContext(stats *generatorStats, db ethdb.KeyValueStore, accMarker []byte, storageMarker []byte) *generatorContext {
func newGeneratorContext(root common.Hash, marker []byte, setMarker func(marker []byte), db ethdb.KeyValueStore) *generatorContext {
ctx := &generatorContext{
stats: stats,
db: db,
batch: db.NewBatch(),
logged: time.Now(),
root: root,
marker: marker,
setMarker: setMarker,
db: db,
batch: db.NewBatch(),
logged: time.Now(),
}
accMarker, storageMarker := splitMarker(marker)
ctx.openIterator(snapAccount, accMarker)
ctx.openIterator(snapStorage, storageMarker)
return ctx
}

// setGenMarker updates the generation progress marker locally and cascades it
// to associated disk layer.
func (ctx *generatorContext) setGenMarker(marker []byte) {
ctx.marker = marker
ctx.setMarker(marker)
}

// openIterator constructs global account and storage snapshot iterators
// at the interrupted position. These iterators should be reopened from time
// to time to avoid blocking leveldb compaction for a long time.
Expand Down Expand Up @@ -164,7 +179,7 @@ func (ctx *generatorContext) iterator(kind string) *holdableIterator {
// the specified account. When the iterator touches the storage entry which
// is located in or outside the given account, it stops and holds the current
// iterated element locally.
func (ctx *generatorContext) removeStorageBefore(account common.Hash) {
func (ctx *generatorContext) removeStorageBefore(account common.Hash) uint64 {
var (
count uint64
start = time.Now()
Expand All @@ -183,8 +198,8 @@ func (ctx *generatorContext) removeStorageBefore(account common.Hash) {
ctx.batch.Reset()
}
}
ctx.stats.dangling += count
snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}

// removeStorageAt deletes all storage entries which are located in the specified
Expand Down Expand Up @@ -221,7 +236,7 @@ func (ctx *generatorContext) removeStorageAt(account common.Hash) error {

// removeStorageLeft deletes all storage entries which are located after
// the current iterator position.
func (ctx *generatorContext) removeStorageLeft() {
func (ctx *generatorContext) removeStorageLeft() uint64 {
var (
count uint64
start = time.Now()
Expand All @@ -235,7 +250,7 @@ func (ctx *generatorContext) removeStorageLeft() {
ctx.batch.Reset()
}
}
ctx.stats.dangling += count
snapDanglingStorageMeter.Mark(int64(count))
snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}
36 changes: 27 additions & 9 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/triedb"
)

// diskLayer is a low level persistent snapshot built on top of a key-value store.
type diskLayer struct {
diskdb ethdb.KeyValueStore // Key-value store containing the base snapshot
triedb *triedb.Database // Trie node cache for reconstruction purposes
cache *fastcache.Cache // Cache to avoid hitting the disk for direct access

root common.Hash // Root hash of the base snapshot
stale bool // Signals that the layer became stale (state progressed)
root common.Hash // Root hash of the base snapshot
stale bool // Signals that the layer became stale (state progressed)
genMarker []byte // Marker for the state that's indexed during initial layer generation
lock sync.RWMutex // Lock to protect stale and genMarker

genMarker []byte // Marker for the state that's indexed during initial layer generation
genPending chan struct{} // Notification channel when generation is done (test synchronicity)
genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer

lock sync.RWMutex
// State snapshot generator, set only if background generation is granted.
// Normally, a non-nil generator indicates that background snapshot generation
// is actively running, except for very short periods during restarts.
generator *generator
}

// Release releases underlying resources; specifically the fastcache requires
Expand Down Expand Up @@ -74,6 +73,17 @@ func (dl *diskLayer) Stale() bool {
return dl.stale
}

// markStale sets the stale flag as true.
func (dl *diskLayer) markStale() {
dl.lock.Lock()
defer dl.lock.Unlock()

if dl.stale {
panic("disk layer is stale")
}
dl.stale = true
}

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
func (dl *diskLayer) Account(hash common.Hash) (*types.SlimAccount, error) {
Expand Down Expand Up @@ -175,3 +185,11 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
return newDiffLayer(dl, blockHash, destructs, accounts, storage)
}

// setGenMarker updates the generation progress marker with provided value.
func (dl *diskLayer) setGenMarker(marker []byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.genMarker = marker
}
76 changes: 0 additions & 76 deletions core/state/snapshot/disklayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
)

// reverse reverses the contents of a byte slice. It's used to update random accs
Expand Down Expand Up @@ -426,81 +425,6 @@ func TestDiskPartialMerge(t *testing.T) {
}
}

// Tests that when the bottom-most diff layer is merged into the disk
// layer whether the corresponding generator is persisted correctly.
func TestDiskGeneratorPersistence(t *testing.T) {
var (
accOne = randomHash()
accTwo = randomHash()
accOneSlotOne = randomHash()
accOneSlotTwo = randomHash()

accThree = randomHash()
accThreeSlot = randomHash()
baseRoot = randomHash()
diffRoot = randomHash()
diffTwoRoot = randomHash()
genMarker = append(randomHash().Bytes(), randomHash().Bytes()...)
)
// Testing scenario 1, the disk layer is still under the construction.
db := rawdb.NewMemoryDatabase()

rawdb.WriteAccountSnapshot(db, accOne, accOne[:])
rawdb.WriteStorageSnapshot(db, accOne, accOneSlotOne, accOneSlotOne[:])
rawdb.WriteStorageSnapshot(db, accOne, accOneSlotTwo, accOneSlotTwo[:])
rawdb.WriteSnapshotRoot(db, baseRoot)

// Create a disk layer based on all above updates
snaps := &Tree{
layers: map[common.Hash]snapshot{
baseRoot: &diskLayer{
diskdb: db,
cache: fastcache.New(500 * 1024),
root: baseRoot,
genMarker: genMarker,
},
},
}
// Modify or delete some accounts, flatten everything onto disk
if err := snaps.Update(diffRoot, baseRoot, nil, map[common.Hash][]byte{
accTwo: accTwo[:],
}, nil); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob := rawdb.ReadSnapshotGenerator(db)
var generator journalGenerator
if err := rlp.DecodeBytes(blob, &generator); err != nil {
t.Fatalf("Failed to decode snapshot generator %v", err)
}
if !bytes.Equal(generator.Marker, genMarker) {
t.Fatalf("Generator marker is not matched")
}
// Test scenario 2, the disk layer is fully generated
// Modify or delete some accounts, flatten everything onto disk
if err := snaps.Update(diffTwoRoot, diffRoot, nil, map[common.Hash][]byte{
accThree: accThree.Bytes(),
}, map[common.Hash]map[common.Hash][]byte{
accThree: {accThreeSlot: accThreeSlot.Bytes()},
}); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)
diskLayer.genMarker = nil // Construction finished
if err := snaps.Cap(diffTwoRoot, 0); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob = rawdb.ReadSnapshotGenerator(db)
if err := rlp.DecodeBytes(blob, &generator); err != nil {
t.Fatalf("Failed to decode snapshot generator %v", err)
}
if len(generator.Marker) != 0 {
t.Fatalf("Failed to update snapshot generator")
}
}

// Tests that merging something into a disk layer persists it into the database
// and invalidates any previously written and cached values, discarding anything
// after the in-progress generation marker.
Expand Down
Loading

0 comments on commit 76d4609

Please sign in to comment.