From 76625b4a7ceef75fb91ad370c88f64305c90b877 Mon Sep 17 00:00:00 2001 From: Ryan Staudt Date: Tue, 18 May 2021 05:36:32 -0500 Subject: [PATCH] blockchain: Add Upgrade to UtxoBackend. This adds the Upgrade method to the UtxoBackend interface and LevelDbUtxoBackend implementation. A summary of the changes is as follows: - Add Upgrade method to UtxoBackend interface - Add Upgrade method to LevelDbUtxoBackend - Remove initUtxoState and instead handle upgrading through the UtxoCache Initialize method - Pass the UTXO backend into upgradeUtxoDb rather than the entire BlockChain instance --- blockchain/chain.go | 2 +- blockchain/upgrade.go | 39 +++++++++++++++++++----------------- blockchain/utxobackend.go | 12 +++++++++++ blockchain/utxocache.go | 25 +++++++++++++++-------- blockchain/utxocache_test.go | 32 +++++++++++++++++++++++------ blockchain/utxoio.go | 18 ----------------- 6 files changed, 77 insertions(+), 51 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index 7f004d8f37..d0c05264d0 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -2264,7 +2264,7 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { // Initialize the UTXO state. This entails running any database migrations as // necessary as well as initializing the UTXO cache. - if err := b.initUtxoState(ctx, config.UtxoBackend); err != nil { + if err := b.utxoCache.Initialize(ctx, &b, b.bestChain.tip()); err != nil { return nil, err } diff --git a/blockchain/upgrade.go b/blockchain/upgrade.go index 3786fcfb8c..75b4dfcdf8 100644 --- a/blockchain/upgrade.go +++ b/blockchain/upgrade.go @@ -3123,7 +3123,7 @@ func upgradeSpendJournalToVersion3(ctx context.Context, b *BlockChain) error { } // upgradeUtxoSetToVersion3 upgrades a version 2 utxo set to version 3. -func upgradeUtxoSetToVersion3(ctx context.Context, b *BlockChain, +func upgradeUtxoSetToVersion3(ctx context.Context, db database.DB, utxoBackend UtxoBackend) error { if interruptRequested(ctx) { @@ -3134,7 +3134,7 @@ func upgradeUtxoSetToVersion3(ctx context.Context, b *BlockChain, start := time.Now() // Migrate the utxoset to version 3. - err := migrateUtxoSetVersion2To3(ctx, b.db) + err := migrateUtxoSetVersion2To3(ctx, db) if err != nil { return err } @@ -3191,7 +3191,7 @@ func upgradeToVersion10(ctx context.Context, db database.DB, dbInfo *databaseInf // separateUtxoDatabase moves the UTXO set and state from the block database to // the UTXO database. -func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { +func separateUtxoDatabase(ctx context.Context, db database.DB, utxoDb database.DB) error { // Hardcoded bucket and key names so updates do not affect old upgrades. v1UtxoSetStateKeyName := []byte("utxosetstate") v3UtxoSetBucketName := []byte("utxosetv3") @@ -3271,8 +3271,8 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { // Migrate all entries in batches for the reasons mentioned above. var isFullyDone bool for !isFullyDone { - err := b.db.View(func(dbTx database.Tx) error { - err := b.utxoDb.Update(func(utxoDbTx database.Tx) error { + err := db.View(func(dbTx database.Tx) error { + err := utxoDb.Update(func(utxoDbTx database.Tx) error { var err error isFullyDone, err = doBatch(dbTx, utxoDbTx) if errors.Is(err, errInterruptRequested) || @@ -3307,7 +3307,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { // Get the UTXO set state from the old database. var serialized []byte - err = b.db.View(func(dbTx database.Tx) error { + err = db.View(func(dbTx database.Tx) error { serialized = dbTx.Metadata().Get(v1UtxoSetStateKeyName) return nil }) @@ -3317,7 +3317,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { if serialized != nil { // Set the UTXO set state in the new database. - err = b.utxoDb.Update(func(utxoDbTx database.Tx) error { + err = utxoDb.Update(func(utxoDbTx database.Tx) error { return utxoDbTx.Metadata().Put(v1UtxoSetStateKeyName, serialized) }) if err != nil { @@ -3327,14 +3327,14 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { // Force the UTXO database to flush to disk before removing the UTXO set // and UTXO state from the block database. - err = b.utxoDb.Flush() + err = utxoDb.Flush() if err != nil { return err } if serialized != nil { // Delete the UTXO set state from the old database. - err = b.db.Update(func(dbTx database.Tx) error { + err = db.Update(func(dbTx database.Tx) error { return dbTx.Metadata().Delete(v1UtxoSetStateKeyName) }) if err != nil { @@ -3345,11 +3345,11 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { // Drop UTXO set from the old database. log.Info("Removing old UTXO set...") start = time.Now() - err = incrementalFlatDrop(ctx, b.db, v3UtxoSetBucketName, "old UTXO set") + err = incrementalFlatDrop(ctx, db, v3UtxoSetBucketName, "old UTXO set") if err != nil { return err } - err = b.db.Update(func(dbTx database.Tx) error { + err = db.Update(func(dbTx database.Tx) error { return dbTx.Metadata().DeleteBucket(v3UtxoSetBucketName) }) if err != nil { @@ -3360,7 +3360,7 @@ func separateUtxoDatabase(ctx context.Context, b *BlockChain) error { // Force the block database to flush to disk to avoid rerunning the migration // in the event of an unclean shutown. - return b.db.Flush() + return db.Flush() } // checkDBTooOldToUpgrade returns an ErrDBTooOldToUpgrade error if the provided @@ -3505,9 +3505,12 @@ func upgradeSpendJournal(ctx context.Context, b *BlockChain) error { // so utxo set upgrades prior to version 3 are handled in the block database // upgrade path. // -// NOTE: The database info housed by the passed blockchain instance will be -// updated with the latest versions. -func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend) error { +// NOTE: The database info housed in the passed block database instance and +// backend info housed in the passed utxo backend instance will be updated with +// the latest versions. +func upgradeUtxoDb(ctx context.Context, db database.DB, + utxoBackend *LevelDbUtxoBackend) error { + // Fetch the backend versioning info. utxoDbInfo, err := utxoBackend.FetchInfo() if err != nil { @@ -3516,7 +3519,7 @@ func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend) // Update to a version 3 utxo set as needed. if utxoDbInfo.utxoVer == 2 { - if err := upgradeUtxoSetToVersion3(ctx, b, utxoBackend); err != nil { + if err := upgradeUtxoSetToVersion3(ctx, db, utxoBackend); err != nil { return err } } @@ -3524,7 +3527,7 @@ func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend) // Check if the block database contains the UTXO set or state. If it does, // move the UTXO set and state from the block database to the UTXO database. blockDbUtxoSetExists := false - b.db.View(func(dbTx database.Tx) error { + db.View(func(dbTx database.Tx) error { if dbTx.Metadata().Bucket([]byte("utxosetv3")) != nil || dbTx.Metadata().Get([]byte("utxosetstate")) != nil { @@ -3533,7 +3536,7 @@ func upgradeUtxoDb(ctx context.Context, b *BlockChain, utxoBackend UtxoBackend) return nil }) if blockDbUtxoSetExists { - err := separateUtxoDatabase(ctx, b) + err := separateUtxoDatabase(ctx, db, utxoBackend.db) if err != nil { return err } diff --git a/blockchain/utxobackend.go b/blockchain/utxobackend.go index 0c90479734..5c6d9a5636 100644 --- a/blockchain/utxobackend.go +++ b/blockchain/utxobackend.go @@ -5,6 +5,7 @@ package blockchain import ( + "context" "errors" "fmt" "os" @@ -125,6 +126,10 @@ type UtxoBackend interface { // PutUtxos atomically updates the UTXO set with the entries from the provided // map along with the current state. PutUtxos(utxos map[wire.OutPoint]*UtxoEntry, state *UtxoSetState) error + + // Upgrade upgrades the UTXO backend by applying all possible upgrades + // iteratively as needed. + Upgrade(ctx context.Context, b *BlockChain) error } // LevelDbUtxoBackend implements the UtxoBackend interface using an underlying @@ -660,3 +665,10 @@ func (l *LevelDbUtxoBackend) PutUtxos(utxos map[wire.OutPoint]*UtxoEntry, // where to start from when recovering on startup. return l.db.Flush() } + +// Upgrade upgrades the UTXO backend by applying all possible upgrades +// iteratively as needed. +func (l *LevelDbUtxoBackend) Upgrade(ctx context.Context, b *BlockChain) error { + // Upgrade the UTXO database as needed. + return upgradeUtxoDb(ctx, b.db, l) +} diff --git a/blockchain/utxocache.go b/blockchain/utxocache.go index eb5336a4cc..642fa76a30 100644 --- a/blockchain/utxocache.go +++ b/blockchain/utxocache.go @@ -5,6 +5,7 @@ package blockchain import ( + "context" "fmt" "math" "sync" @@ -101,9 +102,10 @@ type UtxoCacher interface { // always be in sync with the best block. FetchStats() (*UtxoStats, error) - // Initialize initializes the utxo cache by ensuring that the utxo set is - // caught up to the tip of the best chain. - Initialize(b *BlockChain, tip *blockNode) error + // Initialize initializes the utxo cache and underlying utxo backend. This + // entails running any database migrations as well as ensuring that the utxo + // set is caught up to the tip of the best chain. + Initialize(ctx context.Context, b *BlockChain, tip *blockNode) error // MaybeFlush conditionally flushes the cache to the backend. A flush can be // forced by setting the force flush parameter. @@ -695,8 +697,9 @@ func (c *UtxoCache) MaybeFlush(bestHash *chainhash.Hash, bestHeight uint32, return nil } -// Initialize initializes the utxo cache by ensuring that the utxo set is caught -// up to the tip of the best chain. +// Initialize initializes the utxo cache and underlying utxo backend. This +// entails running any database migrations as well as ensuring that the utxo set +// is caught up to the tip of the best chain. // // Since the cache is only flushed to the backend periodically, the utxo set // may not be caught up to the tip of the best chain. This function catches the @@ -704,10 +707,16 @@ func (c *UtxoCache) MaybeFlush(bestHash *chainhash.Hash, bestHeight uint32, // last flushed to the tip block through the cache. // // This function should only be called during initialization. -func (c *UtxoCache) Initialize(b *BlockChain, tip *blockNode) error { +func (c *UtxoCache) Initialize(ctx context.Context, b *BlockChain, tip *blockNode) error { log.Infof("UTXO cache initializing (max size: %d MiB)...", c.maxSize/1024/1024) + // Upgrade the UTXO backend as needed. + err := c.backend.Upgrade(ctx, b) + if err != nil { + return err + } + // Fetch the utxo set state from the backend. state, err := c.backend.FetchState() if err != nil { @@ -743,14 +752,14 @@ func (c *UtxoCache) Initialize(b *BlockChain, tip *blockNode) error { lastFlushedNode := b.index.LookupNode(&state.lastFlushHash) if lastFlushedNode == nil { // panic if the last flushed block node does not exist. This should never - // happen unless the database is corrupted. + // happen unless the backend is corrupted. panicf("last flushed block node hash %v (height %v) does not exist", state.lastFlushHash, state.lastFlushHeight) } fork := b.bestChain.FindFork(lastFlushedNode) // Disconnect all of the blocks back to the point of the fork. This entails - // loading the blocks and their associated spent txos from the database and + // loading the blocks and their associated spent txos from the backend and // using that information to unspend all of the spent txos and remove the // utxos created by the blocks. In addition, if a block votes against its // parent, the regular transactions are reconnected. diff --git a/blockchain/utxocache_test.go b/blockchain/utxocache_test.go index b5f8202710..8929c5f040 100644 --- a/blockchain/utxocache_test.go +++ b/blockchain/utxocache_test.go @@ -5,6 +5,7 @@ package blockchain import ( + "context" "reflect" "testing" "time" @@ -1065,15 +1066,24 @@ func TestInitialize(t *testing.T) { // resetTestUtxoCache replaces the current utxo cache with a new test utxo // cache and calls initialize on it. This simulates an empty utxo cache that // gets created and initialized at startup. + backend := createTestUtxoBackend(t) + err := backend.InitInfo(g.chain.dbInfo.version) + if err != nil { + t.Fatalf("error initializing backend info: %v", err) + } resetTestUtxoCache := func() *testUtxoCache { testUtxoCache := newTestUtxoCache(&UtxoCacheConfig{ - Backend: NewLevelDbUtxoBackend(g.chain.utxoDb), - DB: g.chain.utxoDb, + Backend: backend, + DB: backend.db, FlushBlockDB: g.chain.db.Flush, MaxSize: 100 * 1024 * 1024, // 100 MiB }) g.chain.utxoCache = testUtxoCache - testUtxoCache.Initialize(g.chain, g.chain.bestChain.Tip()) + err := testUtxoCache.Initialize(context.Background(), g.chain, + g.chain.bestChain.Tip()) + if err != nil { + t.Fatalf("error initializing test cache: %v", err) + } return testUtxoCache } @@ -1179,7 +1189,7 @@ func TestInitialize(t *testing.T) { g.ForceTipReorg("b1", "b1a") // Restore the spend journal entry for block b1. - err := g.chain.db.Update(func(dbTx database.Tx) error { + err = g.chain.db.Update(func(dbTx database.Tx) error { spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName) return spendBucket.Put(b1Hash[:], serialized) }) @@ -1210,12 +1220,22 @@ func TestShutdownUtxoCache(t *testing.T) { // Replace the chain utxo cache with a test cache so that flushing can be // disabled. + backend := createTestUtxoBackend(t) testUtxoCache := newTestUtxoCache(&UtxoCacheConfig{ - Backend: NewLevelDbUtxoBackend(g.chain.utxoDb), - DB: g.chain.utxoDb, + Backend: backend, + DB: backend.db, FlushBlockDB: g.chain.db.Flush, MaxSize: 100 * 1024 * 1024, // 100 MiB }) + err := backend.InitInfo(g.chain.dbInfo.version) + if err != nil { + t.Fatalf("error initializing backend info: %v", err) + } + err = testUtxoCache.Initialize(context.Background(), g.chain, + g.chain.bestChain.Tip()) + if err != nil { + t.Fatalf("error initializing test cache: %v", err) + } g.chain.utxoCache = testUtxoCache // --------------------------------------------------------------------------- diff --git a/blockchain/utxoio.go b/blockchain/utxoio.go index 92963beb42..6bb34843c9 100644 --- a/blockchain/utxoio.go +++ b/blockchain/utxoio.go @@ -6,7 +6,6 @@ package blockchain import ( - "context" "fmt" "sync" @@ -302,20 +301,3 @@ func deserializeUtxoSetState(serialized []byte) (*UtxoSetState, error) { lastFlushHash: hash, }, nil } - -// initUtxoState attempts to load and initialize the UTXO state from the -// database. This entails running any database migrations as necessary as well -// as initializing the UTXO cache. -func (b *BlockChain) initUtxoState(ctx context.Context, - utxoBackend UtxoBackend) error { - - // Upgrade the UTXO database as needed. - err := upgradeUtxoDb(ctx, b, utxoBackend) - if err != nil { - return err - } - - // Initialize the UTXO cache to ensure that the state of the UTXO set is - // caught up to the tip of the best chain. - return b.utxoCache.Initialize(b, b.bestChain.tip()) -}