Skip to content

Commit

Permalink
blockchain: Add Upgrade to UtxoBackend.
Browse files Browse the repository at this point in the history
This adds the Upgrade method to the UtxoBackend interface and
LevelDbUtxoBackend implementation.

A summary of the changes is as follows:
- Add Upgrade method to UtxoBackend interface
- Add Upgrade method to LevelDbUtxoBackend
- Remove initUtxoState and instead handle upgrading through the
  UtxoCache Initialize method
- Pass the UTXO backend into upgradeUtxoDb rather than the entire
  BlockChain instance
  • Loading branch information
rstaudt2 committed May 22, 2021
1 parent 7aa663f commit 76625b4
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 51 deletions.
2 changes: 1 addition & 1 deletion blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,7 +2264,7 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) {

// Initialize the UTXO state. This entails running any database migrations as
// necessary as well as initializing the UTXO cache.
if err := b.initUtxoState(ctx, config.UtxoBackend); err != nil {
if err := b.utxoCache.Initialize(ctx, &b, b.bestChain.tip()); err != nil {
return nil, err
}

Expand Down
39 changes: 21 additions & 18 deletions blockchain/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -3123,7 +3123,7 @@ func upgradeSpendJournalToVersion3(ctx context.Context, b *BlockChain) error {
}

// upgradeUtxoSetToVersion3 upgrades a version 2 utxo set to version 3.
func upgradeUtxoSetToVersion3(ctx context.Context, b *BlockChain,
func upgradeUtxoSetToVersion3(ctx context.Context, db database.DB,
utxoBackend UtxoBackend) error {

if interruptRequested(ctx) {
Expand All @@ -3134,7 +3134,7 @@ func upgradeUtxoSetToVersion3(ctx context.Context, b *BlockChain,
start := time.Now()

// Migrate the utxoset to version 3.
err := migrateUtxoSetVersion2To3(ctx, b.db)
err := migrateUtxoSetVersion2To3(ctx, db)
if err != nil {
return err
}
Expand Down Expand Up @@ -3191,7 +3191,7 @@ func upgradeToVersion10(ctx context.Context, db database.DB, dbInfo *databaseInf

// separateUtxoDatabase moves the UTXO set and state from the block database to
// the UTXO database.
func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {
func separateUtxoDatabase(ctx context.Context, db database.DB, utxoDb database.DB) error {
// Hardcoded bucket and key names so updates do not affect old upgrades.
v1UtxoSetStateKeyName := []byte("utxosetstate")
v3UtxoSetBucketName := []byte("utxosetv3")
Expand Down Expand Up @@ -3271,8 +3271,8 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {
// Migrate all entries in batches for the reasons mentioned above.
var isFullyDone bool
for !isFullyDone {
err := b.db.View(func(dbTx database.Tx) error {
err := b.utxoDb.Update(func(utxoDbTx database.Tx) error {
err := db.View(func(dbTx database.Tx) error {
err := utxoDb.Update(func(utxoDbTx database.Tx) error {
var err error
isFullyDone, err = doBatch(dbTx, utxoDbTx)
if errors.Is(err, errInterruptRequested) ||
Expand Down Expand Up @@ -3307,7 +3307,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {

// Get the UTXO set state from the old database.
var serialized []byte
err = b.db.View(func(dbTx database.Tx) error {
err = db.View(func(dbTx database.Tx) error {
serialized = dbTx.Metadata().Get(v1UtxoSetStateKeyName)
return nil
})
Expand All @@ -3317,7 +3317,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {

if serialized != nil {
// Set the UTXO set state in the new database.
err = b.utxoDb.Update(func(utxoDbTx database.Tx) error {
err = utxoDb.Update(func(utxoDbTx database.Tx) error {
return utxoDbTx.Metadata().Put(v1UtxoSetStateKeyName, serialized)
})
if err != nil {
Expand All @@ -3327,14 +3327,14 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {

// Force the UTXO database to flush to disk before removing the UTXO set
// and UTXO state from the block database.
err = b.utxoDb.Flush()
err = utxoDb.Flush()
if err != nil {
return err
}

if serialized != nil {
// Delete the UTXO set state from the old database.
err = b.db.Update(func(dbTx database.Tx) error {
err = db.Update(func(dbTx database.Tx) error {
return dbTx.Metadata().Delete(v1UtxoSetStateKeyName)
})
if err != nil {
Expand All @@ -3345,11 +3345,11 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {
// Drop UTXO set from the old database.
log.Info("Removing old UTXO set...")
start = time.Now()
err = incrementalFlatDrop(ctx, b.db, v3UtxoSetBucketName, "old UTXO set")
err = incrementalFlatDrop(ctx, db, v3UtxoSetBucketName, "old UTXO set")
if err != nil {
return err
}
err = b.db.Update(func(dbTx database.Tx) error {
err = db.Update(func(dbTx database.Tx) error {
return dbTx.Metadata().DeleteBucket(v3UtxoSetBucketName)
})
if err != nil {
Expand All @@ -3360,7 +3360,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error {

// Force the block database to flush to disk to avoid rerunning the migration
// in the event of an unclean shutown.
return b.db.Flush()
return db.Flush()
}

// checkDBTooOldToUpgrade returns an ErrDBTooOldToUpgrade error if the provided
Expand Down Expand Up @@ -3505,9 +3505,12 @@ func upgradeSpendJournal(ctx context.Context, b *BlockChain) error {
// so utxo set upgrades prior to version 3 are handled in the block database
// upgrade path.
//
// NOTE: The database info housed by the passed blockchain instance will be
// updated with the latest versions.
func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend) error {
// NOTE: The database info housed in the passed block database instance and
// backend info housed in the passed utxo backend instance will be updated with
// the latest versions.
func upgradeUtxoDb(ctx context.Context, db database.DB,
utxoBackend *LevelDbUtxoBackend) error {

// Fetch the backend versioning info.
utxoDbInfo, err := utxoBackend.FetchInfo()
if err != nil {
Expand All @@ -3516,15 +3519,15 @@ func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend)

// Update to a version 3 utxo set as needed.
if utxoDbInfo.utxoVer == 2 {
if err := upgradeUtxoSetToVersion3(ctx, b, utxoBackend); err != nil {
if err := upgradeUtxoSetToVersion3(ctx, db, utxoBackend); err != nil {
return err
}
}

// Check if the block database contains the UTXO set or state. If it does,
// move the UTXO set and state from the block database to the UTXO database.
blockDbUtxoSetExists := false
b.db.View(func(dbTx database.Tx) error {
db.View(func(dbTx database.Tx) error {
if dbTx.Metadata().Bucket([]byte("utxosetv3")) != nil ||
dbTx.Metadata().Get([]byte("utxosetstate")) != nil {

Expand All @@ -3533,7 +3536,7 @@ func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend)
return nil
})
if blockDbUtxoSetExists {
err := separateUtxoDatabase(ctx, b)
err := separateUtxoDatabase(ctx, db, utxoBackend.db)
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions blockchain/utxobackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package blockchain

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -125,6 +126,10 @@ type UtxoBackend interface {
// PutUtxos atomically updates the UTXO set with the entries from the provided
// map along with the current state.
PutUtxos(utxos map[wire.OutPoint]*UtxoEntry, state *UtxoSetState) error

// Upgrade upgrades the UTXO backend by applying all possible upgrades
// iteratively as needed.
Upgrade(ctx context.Context, b *BlockChain) error
}

// LevelDbUtxoBackend implements the UtxoBackend interface using an underlying
Expand Down Expand Up @@ -660,3 +665,10 @@ func (l *LevelDbUtxoBackend) PutUtxos(utxos map[wire.OutPoint]*UtxoEntry,
// where to start from when recovering on startup.
return l.db.Flush()
}

// Upgrade upgrades the UTXO backend by applying all possible upgrades
// iteratively as needed.
func (l *LevelDbUtxoBackend) Upgrade(ctx context.Context, b *BlockChain) error {
// Upgrade the UTXO database as needed.
return upgradeUtxoDb(ctx, b.db, l)
}
25 changes: 17 additions & 8 deletions blockchain/utxocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package blockchain

import (
"context"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -101,9 +102,10 @@ type UtxoCacher interface {
// always be in sync with the best block.
FetchStats() (*UtxoStats, error)

// Initialize initializes the utxo cache by ensuring that the utxo set is
// caught up to the tip of the best chain.
Initialize(b *BlockChain, tip *blockNode) error
// Initialize initializes the utxo cache and underlying utxo backend. This
// entails running any database migrations as well as ensuring that the utxo
// set is caught up to the tip of the best chain.
Initialize(ctx context.Context, b *BlockChain, tip *blockNode) error

// MaybeFlush conditionally flushes the cache to the backend. A flush can be
// forced by setting the force flush parameter.
Expand Down Expand Up @@ -695,19 +697,26 @@ func (c *UtxoCache) MaybeFlush(bestHash *chainhash.Hash, bestHeight uint32,
return nil
}

// Initialize initializes the utxo cache by ensuring that the utxo set is caught
// up to the tip of the best chain.
// Initialize initializes the utxo cache and underlying utxo backend. This
// entails running any database migrations as well as ensuring that the utxo set
// is caught up to the tip of the best chain.
//
// Since the cache is only flushed to the backend periodically, the utxo set
// may not be caught up to the tip of the best chain. This function catches the
// utxo set up by replaying all blocks from the block after the block that was
// last flushed to the tip block through the cache.
//
// This function should only be called during initialization.
func (c *UtxoCache) Initialize(b *BlockChain, tip *blockNode) error {
func (c *UtxoCache) Initialize(ctx context.Context, b *BlockChain, tip *blockNode) error {
log.Infof("UTXO cache initializing (max size: %d MiB)...",
c.maxSize/1024/1024)

// Upgrade the UTXO backend as needed.
err := c.backend.Upgrade(ctx, b)
if err != nil {
return err
}

// Fetch the utxo set state from the backend.
state, err := c.backend.FetchState()
if err != nil {
Expand Down Expand Up @@ -743,14 +752,14 @@ func (c *UtxoCache) Initialize(b *BlockChain, tip *blockNode) error {
lastFlushedNode := b.index.LookupNode(&state.lastFlushHash)
if lastFlushedNode == nil {
// panic if the last flushed block node does not exist. This should never
// happen unless the database is corrupted.
// happen unless the backend is corrupted.
panicf("last flushed block node hash %v (height %v) does not exist",
state.lastFlushHash, state.lastFlushHeight)
}
fork := b.bestChain.FindFork(lastFlushedNode)

// Disconnect all of the blocks back to the point of the fork. This entails
// loading the blocks and their associated spent txos from the database and
// loading the blocks and their associated spent txos from the backend and
// using that information to unspend all of the spent txos and remove the
// utxos created by the blocks. In addition, if a block votes against its
// parent, the regular transactions are reconnected.
Expand Down
32 changes: 26 additions & 6 deletions blockchain/utxocache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package blockchain

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -1065,15 +1066,24 @@ func TestInitialize(t *testing.T) {
// resetTestUtxoCache replaces the current utxo cache with a new test utxo
// cache and calls initialize on it. This simulates an empty utxo cache that
// gets created and initialized at startup.
backend := createTestUtxoBackend(t)
err := backend.InitInfo(g.chain.dbInfo.version)
if err != nil {
t.Fatalf("error initializing backend info: %v", err)
}
resetTestUtxoCache := func() *testUtxoCache {
testUtxoCache := newTestUtxoCache(&UtxoCacheConfig{
Backend: NewLevelDbUtxoBackend(g.chain.utxoDb),
DB: g.chain.utxoDb,
Backend: backend,
DB: backend.db,
FlushBlockDB: g.chain.db.Flush,
MaxSize: 100 * 1024 * 1024, // 100 MiB
})
g.chain.utxoCache = testUtxoCache
testUtxoCache.Initialize(g.chain, g.chain.bestChain.Tip())
err := testUtxoCache.Initialize(context.Background(), g.chain,
g.chain.bestChain.Tip())
if err != nil {
t.Fatalf("error initializing test cache: %v", err)
}
return testUtxoCache
}

Expand Down Expand Up @@ -1179,7 +1189,7 @@ func TestInitialize(t *testing.T) {
g.ForceTipReorg("b1", "b1a")

// Restore the spend journal entry for block b1.
err := g.chain.db.Update(func(dbTx database.Tx) error {
err = g.chain.db.Update(func(dbTx database.Tx) error {
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
return spendBucket.Put(b1Hash[:], serialized)
})
Expand Down Expand Up @@ -1210,12 +1220,22 @@ func TestShutdownUtxoCache(t *testing.T) {

// Replace the chain utxo cache with a test cache so that flushing can be
// disabled.
backend := createTestUtxoBackend(t)
testUtxoCache := newTestUtxoCache(&UtxoCacheConfig{
Backend: NewLevelDbUtxoBackend(g.chain.utxoDb),
DB: g.chain.utxoDb,
Backend: backend,
DB: backend.db,
FlushBlockDB: g.chain.db.Flush,
MaxSize: 100 * 1024 * 1024, // 100 MiB
})
err := backend.InitInfo(g.chain.dbInfo.version)
if err != nil {
t.Fatalf("error initializing backend info: %v", err)
}
err = testUtxoCache.Initialize(context.Background(), g.chain,
g.chain.bestChain.Tip())
if err != nil {
t.Fatalf("error initializing test cache: %v", err)
}
g.chain.utxoCache = testUtxoCache

// ---------------------------------------------------------------------------
Expand Down
18 changes: 0 additions & 18 deletions blockchain/utxoio.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package blockchain

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -302,20 +301,3 @@ func deserializeUtxoSetState(serialized []byte) (*UtxoSetState, error) {
lastFlushHash: hash,
}, nil
}

// initUtxoState attempts to load and initialize the UTXO state from the
// database. This entails running any database migrations as necessary as well
// as initializing the UTXO cache.
func (b *BlockChain) initUtxoState(ctx context.Context,
utxoBackend UtxoBackend) error {

// Upgrade the UTXO database as needed.
err := upgradeUtxoDb(ctx, b, utxoBackend)
if err != nil {
return err
}

// Initialize the UTXO cache to ensure that the state of the UTXO set is
// caught up to the tip of the best chain.
return b.utxoCache.Initialize(b, b.bestChain.tip())
}

0 comments on commit 76625b4

Please sign in to comment.