From 4c098ac7d73abc6fe8d7d4f36b3f7396dea1a5cd Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 9 Jul 2024 20:16:56 -0400 Subject: [PATCH] removing old store keys by pruning --- store/v2/commitment/iavl/tree.go | 3 +- store/v2/commitment/metadata.go | 73 +++++++++++++++++++- store/v2/commitment/store.go | 92 +++++++++++++++++++++++-- store/v2/commitment/store_test_suite.go | 13 ++++ store/v2/database.go | 4 +- store/v2/go.mod | 2 +- store/v2/go.sum | 2 + store/v2/pruning/manager.go | 54 --------------- store/v2/root/store.go | 20 ++---- store/v2/root/upgrade_test.go | 57 ++++++++------- store/v2/storage/pebbledb/db.go | 80 ++++++++++++++++----- store/v2/storage/rocksdb/db.go | 4 +- store/v2/storage/sqlite/db.go | 41 ++++++++--- store/v2/storage/storage_test_suite.go | 43 ++++++++---- store/v2/storage/store.go | 6 +- 15 files changed, 335 insertions(+), 159 deletions(-) diff --git a/store/v2/commitment/iavl/tree.go b/store/v2/commitment/iavl/tree.go index 06c117ab0ac4..0a348074da56 100644 --- a/store/v2/commitment/iavl/tree.go +++ b/store/v2/commitment/iavl/tree.go @@ -96,7 +96,8 @@ func (t *IavlTree) Get(version uint64, key []byte) ([]byte, error) { // GetLatestVersion returns the latest version of the tree. func (t *IavlTree) GetLatestVersion() (uint64, error) { - return uint64(t.tree.Version()), nil + v, err := t.tree.GetLatestVersion() + return uint64(v), err } // SetInitialVersion sets the initial version of the database. diff --git a/store/v2/commitment/metadata.go b/store/v2/commitment/metadata.go index 286ad1f819e3..12b9b219cbae 100644 --- a/store/v2/commitment/metadata.go +++ b/store/v2/commitment/metadata.go @@ -3,6 +3,7 @@ package commitment import ( "bytes" "fmt" + "slices" corestore "cosmossdk.io/core/store" "cosmossdk.io/store/v2/internal/encoding" @@ -10,8 +11,9 @@ import ( ) const ( - commitInfoKeyFmt = "c/%d" // c/ - latestVersionKey = "c/latest" + commitInfoKeyFmt = "c/%d" // c/ + latestVersionKey = "c/latest" + removedStoreKeyPrefix = "c/removed/" // c/removed// ) type MetadataStore struct { @@ -99,6 +101,73 @@ func (m *MetadataStore) flushCommitInfo(version uint64, cInfo *proof.CommitInfo) return batch.Close() } +func (m *MetadataStore) flushRemovedStoreKeys(version uint64, storeKeys []string) (err error) { + batch := m.kv.NewBatch() + defer func() { + err = batch.Close() + }() + + for _, storeKey := range storeKeys { + key := []byte(fmt.Sprintf("%s%020d/%s", removedStoreKeyPrefix, version, storeKey)) + if err := batch.Set(key, []byte{}); err != nil { + return err + } + } + return batch.WriteSync() +} + +func (m *MetadataStore) getRemovedStoreKeys(version uint64) (storeKeys []string, err error) { + end := []byte(fmt.Sprintf("%s%020d/", removedStoreKeyPrefix, version+1)) + iter, err := m.kv.Iterator([]byte(removedStoreKeyPrefix), end) + if err != nil { + return nil, err + } + defer func() { + if ierr := iter.Close(); ierr != nil { + err = ierr + } + }() + + for ; iter.Valid(); iter.Next() { + storeKey := string(iter.Key()[len(end):]) + storeKeys = append(storeKeys, storeKey) + } + + return +} + +func (m *MetadataStore) deleteRemovedStoreKeys(storeKeys []string, version uint64) (err error) { + batch := m.kv.NewBatch() + defer func() { + if berr := batch.Close(); berr != nil { + err = berr + } + }() + + end := []byte(fmt.Sprintf("%s%020d/", removedStoreKeyPrefix, version+1)) + iter, err := m.kv.Iterator([]byte(removedStoreKeyPrefix), end) + if err != nil { + return err + } + defer func() { + if ierr := iter.Close(); ierr != nil { + err = ierr + } + }() + + for ; iter.Valid(); iter.Next() { + storeKey := string(iter.Key()[len(end):]) + if !slices.Contains(storeKeys, storeKey) { + continue + } + if err := batch.Delete(iter.Key()); err != nil { + return nil + } + } + + return batch.WriteSync() +} + func (m *MetadataStore) deleteCommitInfo(version uint64) error { cInfoKey := []byte(fmt.Sprintf(commitInfoKeyFmt, version)) return m.kv.Delete(cInfoKey) diff --git a/store/v2/commitment/store.go b/store/v2/commitment/store.go index 039b3d479da8..b9d5e6adeaab 100644 --- a/store/v2/commitment/store.go +++ b/store/v2/commitment/store.go @@ -6,6 +6,7 @@ import ( "io" "math" "sort" + "sync" protoio "github.com/cosmos/gogoproto/io" @@ -43,7 +44,7 @@ type CommitStore struct { multiTrees map[string]Tree mountTreeFn MountTreeFn - oldTrees map[string]Tree + oldTrees sync.Map } // NewCommitStore creates a new CommitStore instance. @@ -53,7 +54,7 @@ func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, mountT multiTrees: trees, metadata: NewMetadataStore(db), mountTreeFn: mountTreeFn, - oldTrees: make(map[string]Tree), + oldTrees: sync.Map{}, }, nil } @@ -132,12 +133,14 @@ func (c *CommitStore) LoadVersionAndUpgrade(targetVersion uint64, upgrades *core } newStoreKeys := make([]string, 0, len(c.multiTrees)) + removedStoreKeys := make([]string, 0) for _, storeKey := range storeKeys { // If it has been deleted, remove the tree. if upgrades.IsDeleted(storeKey) { if err := removeTree(storeKey); err != nil { return err } + removedStoreKeys = append(removedStoreKeys, storeKey) continue } @@ -155,10 +158,15 @@ func (c *CommitStore) LoadVersionAndUpgrade(targetVersion uint64, upgrades *core if err := c.migrateKVStore(oldKey, storeKey); err != nil { return err } + removedStoreKeys = append(removedStoreKeys, oldKey) } newStoreKeys = append(newStoreKeys, storeKey) } + if err := c.metadata.flushRemovedStoreKeys(targetVersion, removedStoreKeys); err != nil { + return err + } + return c.loadVersion(targetVersion, newStoreKeys) } @@ -322,13 +330,16 @@ func (c *CommitStore) GetProof(storeKey []byte, version uint64, key []byte) ([]p if !ok { // If the tree is not found, it means the store is an old store that has been // deleted or renamed. We should use the old tree to get the proof. - if tree, ok = c.oldTrees[rawStoreKey]; !ok { + v, ok := c.oldTrees.Load(rawStoreKey) + if !ok { var err error tree, err = c.mountTreeFn(rawStoreKey) if err != nil { return nil, fmt.Errorf("store %s not found: %w", storeKey, err) } - c.oldTrees[rawStoreKey] = tree + c.oldTrees.Store(rawStoreKey, tree) + } else { + tree = v.(Tree) } } @@ -371,15 +382,84 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { // prune the metadata for v := version; v > 0; v-- { if err := c.metadata.deleteCommitInfo(v); err != nil { - return err + ferr = errors.Join(ferr, err) } } - + // prune the trees for _, tree := range c.multiTrees { if err := tree.Prune(version); err != nil { ferr = errors.Join(ferr, err) } } + // prune the removed store keys + if err := c.pruneRemovedStoreKeys(version); err != nil { + ferr = errors.Join(ferr, err) + } + + return ferr +} + +func (c *CommitStore) pruneRemovedStoreKeys(version uint64) (ferr error) { + removedStoreKeys, err := c.metadata.getRemovedStoreKeys(version) + if err != nil { + ferr = errors.Join(ferr, err) + } + for _, storeKey := range removedStoreKeys { + tree, err := c.mountTreeFn(storeKey) + if err != nil { + ferr = errors.Join(ferr, err) + continue + } + + clearKVStore := func(kvStore corestore.KVStoreWithBatch) (err error) { + batch := kvStore.NewBatch() + iter, err := kvStore.Iterator(nil, nil) + if err != nil { + return err + } + defer func() { + err = iter.Close() + err = batch.Close() + }() + + for ; iter.Valid(); iter.Next() { + if err := batch.Delete(iter.Key()); err != nil { + return err + } + bs, err := batch.GetByteSize() + if err != nil { + return err + } + if bs > batchFlushThreshold { + if err := batch.Write(); err != nil { + return err + } + if err := batch.Close(); err != nil { + return err + } + batch = kvStore.NewBatch() + } + } + if err := batch.Write(); err != nil { + return err + } + if err := batch.Close(); err != nil { + return err + } + return nil + } + + if kvStoreGetter, ok := tree.(KVStoreGetter); ok { + kvStore := kvStoreGetter.GetKVStoreWithBatch() + if err := clearKVStore(kvStore); err != nil { + ferr = errors.Join(ferr, err) + } + } + } + + if err := c.metadata.deleteRemovedStoreKeys(removedStoreKeys, version); err != nil { + ferr = errors.Join(ferr, err) + } return ferr } diff --git a/store/v2/commitment/store_test_suite.go b/store/v2/commitment/store_test_suite.go index 2e5d43ea0472..a9457fc70fe2 100644 --- a/store/v2/commitment/store_test_suite.go +++ b/store/v2/commitment/store_test_suite.go @@ -327,4 +327,17 @@ func (s *CommitStoreTestSuite) TestStore_Upgrades() { s.Require().NotNil(proof) } } + + // prune the old stores + s.Require().NoError(commitStore.Prune(latestVersion)) + + // GetProof should fail for the old stores + for _, storeKey := range []string{storeKey1, storeKey3} { + for i := uint64(1); i <= latestVersion; i++ { + for j := 0; j < kvCount; j++ { + _, err := commitStore.GetProof([]byte(storeKey), i, []byte(fmt.Sprintf("key-%d-%d", i, j))) + s.Require().Error(err) + } + } + } } diff --git a/store/v2/database.go b/store/v2/database.go index 73d48a1a8924..667910dbc3f5 100644 --- a/store/v2/database.go +++ b/store/v2/database.go @@ -28,8 +28,8 @@ type VersionedDatabase interface { // UpgradableDatabase defines an API for a versioned database that allows storeKey // upgrades. type UpgradableDatabase interface { - // PruneStoreKey prunes all data associated with the given storeKey. - PruneStoreKey(storeKey []byte) error + // PruneStoreKeys prunes all data associated with the given storeKeys. + PruneStoreKeys(storeKeys []string, version uint64) error // MigrateStoreKey upgrades the storeKey from the old to the new storeKey. MigrateStoreKey(oldStoreKey, newStoreKey []byte) error } diff --git a/store/v2/go.mod b/store/v2/go.mod index 1ffa361f3474..3c2326cbf1f7 100644 --- a/store/v2/go.mod +++ b/store/v2/go.mod @@ -8,7 +8,7 @@ require ( cosmossdk.io/log v1.3.1 github.com/cockroachdb/pebble v1.1.0 github.com/cosmos/gogoproto v1.5.0 - github.com/cosmos/iavl v1.2.0 + github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d github.com/cosmos/ics23/go v0.10.0 github.com/google/btree v1.1.2 github.com/hashicorp/go-metrics v0.5.3 diff --git a/store/v2/go.sum b/store/v2/go.sum index 281f6e1bd5ca..7dd71e8772cc 100644 --- a/store/v2/go.sum +++ b/store/v2/go.sum @@ -38,6 +38,8 @@ github.com/cosmos/gogoproto v1.5.0 h1:SDVwzEqZDDBoslaeZg+dGE55hdzHfgUA40pEanMh52 github.com/cosmos/gogoproto v1.5.0/go.mod h1:iUM31aofn3ymidYG6bUR5ZFrk+Om8p5s754eMUcyp8I= github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM= github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI= +github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d h1:3+1Zpq1HG4pjwRKEGIEF1pcsH/hbx0LuHSBuzzX0kYo= +github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI= github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM= github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/store/v2/pruning/manager.go b/store/v2/pruning/manager.go index 88dbc408aeb8..1e36abe7a6ed 100644 --- a/store/v2/pruning/manager.go +++ b/store/v2/pruning/manager.go @@ -1,12 +1,9 @@ package pruning import ( - corestore "cosmossdk.io/core/store" "cosmossdk.io/store/v2" ) -const batchFlushThreshold = 1 << 16 // 64KB - // Manager is a struct that manages the pruning of old versions of the SC and SS. type Manager struct { // scPruner is the pruner for the SC. @@ -72,54 +69,3 @@ func (m *Manager) SignalCommit(start bool, version uint64) error { return nil } - -// PruneKVStores prunes KVStores which are needed to be pruned by upgrading the -// store key. -func (m *Manager) PruneKVStores(kvStores []corestore.KVStoreWithBatch) error { - for _, kvStore := range kvStores { - if kvStore == nil { - continue - } - if err := func() error { - batch := kvStore.NewBatch() - iter, err := kvStore.Iterator(nil, nil) - if err != nil { - return err - } - defer func() { - _ = iter.Close() - _ = batch.Close() - }() - - for ; iter.Valid(); iter.Next() { - if err := batch.Delete(iter.Key()); err != nil { - return err - } - bs, err := batch.GetByteSize() - if err != nil { - return err - } - if bs > batchFlushThreshold { - if err := batch.Write(); err != nil { - return err - } - if err := batch.Close(); err != nil { - return err - } - batch = kvStore.NewBatch() - } - } - if err := batch.Write(); err != nil { - return err - } - if err := batch.Close(); err != nil { - return err - } - return nil - }(); err != nil { - return err - } - } - - return nil -} diff --git a/store/v2/root/store.go b/store/v2/root/store.go index 43b1657473a8..62db44c9424d 100644 --- a/store/v2/root/store.go +++ b/store/v2/root/store.go @@ -256,35 +256,23 @@ func (s *Store) LoadVersionAndUpgrade(version uint64, upgrades *corestore.StoreU return fmt.Errorf("cannot upgrade while migrating") } - // we need to prune the old store keys from SS and SC - var ( - removedStoreKeys []string - removedKVStores []corestore.KVStoreWithBatch - ) + // we need to prune the old store keys from SS + var removedStoreKeys []string removedStoreKeys = append(removedStoreKeys, upgrades.Deleted...) for _, renamedStore := range upgrades.Renamed { removedStoreKeys = append(removedStoreKeys, renamedStore.OldKey) } - for _, storeKey := range removedStoreKeys { - removedKVStores = append(removedKVStores, s.stateCommitment.GetKVStoreWithBatch(storeKey)) - } - if err := s.loadVersion(version, upgrades); err != nil { return err } - if err := s.pruningManager.PruneKVStores(removedKVStores); err != nil { - return fmt.Errorf("failed to set pruned KVStores: %w", err) - } // if the state storage implements the UpgradableDatabase interface, prune the // old store keys upgradableDatabase, ok := s.stateStorage.(store.UpgradableDatabase) if ok { - for _, storeKey := range removedStoreKeys { - if err := upgradableDatabase.PruneStoreKey([]byte(storeKey)); err != nil { - return fmt.Errorf("failed to prune store key %s: %w", storeKey, err) - } + if err := upgradableDatabase.PruneStoreKeys(removedStoreKeys, version); err != nil { + return fmt.Errorf("failed to prune store keys %v: %w", removedStoreKeys, err) } } diff --git a/store/v2/root/upgrade_test.go b/store/v2/root/upgrade_test.go index 1daeed462622..69bfb3057717 100644 --- a/store/v2/root/upgrade_test.go +++ b/store/v2/root/upgrade_test.go @@ -35,16 +35,19 @@ func (s *UpgradeStoreTestSuite) SetupTest() { s.commitDB = dbm.NewMemDB() multiTrees := make(map[string]commitment.Tree) - for _, storeKey := range storeKeys { + newTreeFn := func(storeKey string) (commitment.Tree, error) { prefixDB := dbm.NewPrefixDB(s.commitDB, []byte(storeKey)) - multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + return iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()), nil + } + for _, storeKey := range storeKeys { + multiTrees[storeKey], _ = newTreeFn(storeKey) } // create storage and commitment stores sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) ss := storage.NewStorageStore(sqliteDB, testLog) - sc, err := commitment.NewCommitStore(multiTrees, s.commitDB, nil, testLog) + sc, err := commitment.NewCommitStore(multiTrees, s.commitDB, newTreeFn, testLog) s.Require().NoError(err) pm := pruning.NewManager(sc, ss, nil, nil) s.rootStore, err = New(testLog, ss, sc, pm, nil, nil) @@ -71,20 +74,21 @@ func (s *UpgradeStoreTestSuite) loadWithUpgrades(upgrades *corestore.StoreUpgrad // create a new commitment store multiTrees := make(map[string]commitment.Tree) - for _, storeKey := range storeKeys { + newTreeFn := func(storeKey string) (commitment.Tree, error) { prefixDB := dbm.NewPrefixDB(s.commitDB, []byte(storeKey)) - multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + return iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()), nil + } + for _, storeKey := range storeKeys { + multiTrees[storeKey], _ = newTreeFn(storeKey) } for _, added := range upgrades.Added { - prefixDB := dbm.NewPrefixDB(s.commitDB, []byte(added)) - multiTrees[added] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + multiTrees[added], _ = newTreeFn(added) } for _, rename := range upgrades.Renamed { - prefixDB := dbm.NewPrefixDB(s.commitDB, []byte(rename.NewKey)) - multiTrees[rename.NewKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + multiTrees[rename.NewKey], _ = newTreeFn(rename.NewKey) } - sc, err := commitment.NewCommitStore(multiTrees, s.commitDB, nil, testLog) + sc, err := commitment.NewCommitStore(multiTrees, s.commitDB, newTreeFn, testLog) s.Require().NoError(err) pm := pruning.NewManager(sc, s.rootStore.GetStateStorage().(store.Pruner), nil, nil) s.rootStore, err = New(testLog, s.rootStore.GetStateStorage(), sc, pm, nil, nil) @@ -108,10 +112,22 @@ func (s *UpgradeStoreTestSuite) TestLoadVersionAndUpgrade() { err = s.rootStore.(store.UpgradeableStore).LoadVersionAndUpgrade(v, upgrades) s.Require().NoError(err) + keyCount := 10 + // check old store keys are queryable + oldStoreKeys := []string{"store1", "store3"} + for _, storeKey := range oldStoreKeys { + for version := uint64(1); version <= v; version++ { + for i := 0; i < keyCount; i++ { + proof, err := s.rootStore.Query([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + s.Require().NoError(err) + s.Require().NotNil(proof) + } + } + } + // commit changeset newStoreKeys := []string{"newStore1", "newStore2", "renamedStore1"} toVersion := uint64(40) - keyCount := 10 for version := v + 1; version <= toVersion; version++ { cs := corestore.NewChangeset() for _, storeKey := range newStoreKeys { @@ -123,25 +139,6 @@ func (s *UpgradeStoreTestSuite) TestLoadVersionAndUpgrade() { s.Require().NoError(err) } - // check old store keys are pruned - oldStoreKeys := []string{"store1", "store3"} - for _, storeKey := range oldStoreKeys { - for version := uint64(1); version <= toVersion; version++ { - for i := 0; i < keyCount; i++ { - _, err := s.rootStore.Query([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) - s.Require().Error(err) - } - } - } - - // check commitDB is empty for old store keys - for _, storeKey := range oldStoreKeys { - oldKeyStore := dbm.NewPrefixDB(s.commitDB, []byte(storeKey)) - itr, err := oldKeyStore.Iterator(nil, nil) - s.Require().NoError(err) - s.Require().False(itr.Valid()) - } - // check new store keys are queryable for _, storeKey := range newStoreKeys { for version := v + 1; version <= toVersion; version++ { diff --git a/store/v2/storage/pebbledb/db.go b/store/v2/storage/pebbledb/db.go index 17e947cd0bab..ab5e051845c3 100644 --- a/store/v2/storage/pebbledb/db.go +++ b/store/v2/storage/pebbledb/db.go @@ -25,10 +25,11 @@ const ( // batchBufferSize defines the maximum size of a batch before it is committed. batchBufferSize = 100_000 - StorePrefixTpl = "s/k:%s/" // s/k: - latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl - pruneHeightKey = "s/_prune_height" // NB: pruneHeightKey key must be lexically smaller than StorePrefixTpl - tombstoneVal = "TOMBSTONE" + StorePrefixTpl = "s/k:%s/" // s/k: + removedStoreKeysPrefix = "s/_removed_keys" // NB: removedStoreKeys key must be lexically smaller than StorePrefixTpl + latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl + pruneHeightKey = "s/_prune_height" // NB: pruneHeightKey key must be lexically smaller than StorePrefixTpl + tombstoneVal = "TOMBSTONE" ) var ( @@ -274,6 +275,10 @@ func (db *Database) Prune(version uint64) error { } } + if err := db.deleteRemovedStoreKeys(version); err != nil { + return err + } + return db.setPruneHeight(version) } @@ -325,26 +330,14 @@ func (db *Database) ReverseIterator(storeKey []byte, version uint64, start, end return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, true), nil } -func (db *Database) PruneStoreKey(storeKey []byte) error { +func (db *Database) PruneStoreKeys(storeKeys []string, version uint64) error { batch := db.storage.NewBatch() defer batch.Close() - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: storePrefix(storeKey), UpperBound: storePrefix(util.CopyIncr(storeKey))}) - if err != nil { - return err - } - defer itr.Close() - - for itr.First(); itr.Valid(); itr.Next() { - if err := batch.Delete(itr.Key(), nil); err != nil { + for _, storeKey := range storeKeys { + if err := batch.Set([]byte(fmt.Sprintf("%s%020d/%s", removedStoreKeysPrefix, version, storeKey)), []byte{}, nil); err != nil { return err } - if batch.Len() >= batchBufferSize { - if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil { - return err - } - batch.Reset() - } } return batch.Commit(&pebble.WriteOptions{Sync: db.sync}) @@ -464,3 +457,52 @@ func getMVCCSlice(db *pebble.DB, storeKey, key []byte, version uint64) ([]byte, value, err := itr.ValueAndErr() return slices.Clone(value), err } + +func (db *Database) deleteRemovedStoreKeys(version uint64) error { + batch := db.storage.NewBatch() + defer batch.Close() + + end := []byte(fmt.Sprintf("%s%020d/", removedStoreKeysPrefix, version+1)) + storeKeyIter, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: []byte(removedStoreKeysPrefix), UpperBound: end}) + if err != nil { + return err + } + defer storeKeyIter.Close() + + var storeKeys []string + for storeKeyIter.First(); storeKeyIter.Valid(); storeKeyIter.Next() { + storeKey := string(storeKeyIter.Key()[len(end):]) + storeKeys = append(storeKeys, storeKey) + if err := batch.Delete(storeKeyIter.Key(), nil); err != nil { + return err + } + } + + for _, key := range storeKeys { + if err := func() error { + storeKey := []byte(key) + itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: storePrefix(storeKey), UpperBound: storePrefix(util.CopyIncr(storeKey))}) + if err != nil { + return err + } + defer itr.Close() + + for itr.First(); itr.Valid(); itr.Next() { + if err := batch.Delete(itr.Key(), nil); err != nil { + return err + } + if batch.Len() >= batchBufferSize { + if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil { + return err + } + batch.Reset() + } + } + return nil + }(); err != nil { + return err + } + } + + return batch.Commit(&pebble.WriteOptions{Sync: true}) +} diff --git a/store/v2/storage/rocksdb/db.go b/store/v2/storage/rocksdb/db.go index fcb61eff634e..82b1c9be6a66 100644 --- a/store/v2/storage/rocksdb/db.go +++ b/store/v2/storage/rocksdb/db.go @@ -199,9 +199,9 @@ func (db *Database) ReverseIterator(storeKey []byte, version uint64, start, end return newRocksDBIterator(itr, prefix, start, end, true), nil } -// PruneStoreKey will do nothing for RocksDB, it will be pruned by compaction +// PruneStoreKeys will do nothing for RocksDB, it will be pruned by compaction // when the version is pruned -func (db *Database) PruneStoreKey(storeKey []byte) error { +func (db *Database) PruneStoreKeys(_ []string, _ uint64) error { return nil } diff --git a/store/v2/storage/sqlite/db.go b/store/v2/storage/sqlite/db.go index 6780ea4cad58..0e5d34b31f4f 100644 --- a/store/v2/storage/sqlite/db.go +++ b/store/v2/storage/sqlite/db.go @@ -22,6 +22,7 @@ const ( reservedStoreKey = "_RESERVED_" keyLatestHeight = "latest_height" keyPruneHeight = "prune_height" + keyRemovedStore = "removed_store" reservedUpsertStmt = ` INSERT INTO state_storage(store_key, key, value, version) @@ -203,14 +204,28 @@ func (db *Database) Prune(version uint64) error { ) AND store_key != ?; ` - _, err = tx.Exec(pruneStmt, version, reservedStoreKey) - if err != nil { + if _, err = tx.Exec(pruneStmt, version, reservedStoreKey); err != nil { + return fmt.Errorf("failed to exec SQL statement: %w", err) + } + + // prune removed store keys + pruneRemovedStoreKeysStmt := `DELETE FROM state_storage + WHERE store_key in ( + SELECT value FROM state_storage + WHERE store_key = ? AND key = ? AND version <= ? + ); + ` + + if _, err = tx.Exec(pruneRemovedStoreKeysStmt, reservedStoreKey, keyRemovedStore, version); err != nil { + return fmt.Errorf("failed to exec SQL statement: %w", err) + } + + if _, err = tx.Exec("DELETE FROM state_storage WHERE store_key = ? AND key = ? AND version <= ?", reservedStoreKey, keyPruneHeight, version); err != nil { return fmt.Errorf("failed to exec SQL statement: %w", err) } // set the prune height so we can return for queries below this height - _, err = tx.Exec(reservedUpsertStmt, reservedStoreKey, keyPruneHeight, version, 0, version) - if err != nil { + if _, err = tx.Exec(reservedUpsertStmt, reservedStoreKey, keyPruneHeight, version, 0, version); err != nil { return fmt.Errorf("failed to exec SQL statement: %w", err) } @@ -247,19 +262,27 @@ func (db *Database) ReverseIterator(storeKey []byte, version uint64, start, end return newIterator(db, storeKey, version, start, end, true) } -func (db *Database) PruneStoreKey(storeKey []byte) error { +func (db *Database) PruneStoreKeys(storeKeys []string, version uint64) (err error) { tx, err := db.storage.Begin() if err != nil { return fmt.Errorf("failed to create SQL transaction: %w", err) } defer func() { - _ = tx.Rollback() + err = tx.Rollback() }() - _, err = tx.Exec(`DELETE FROM state_storage WHERE store_key = ?`, storeKey) - if err != nil { - return fmt.Errorf("failed to exec SQL statement: %w", err) + // flush removed store keys + flushRemovedStoreKeysStmt := ` + INSERT INTO state_storage(store_key, key, value, version) + VALUES(?, ?, ?, ?); + ` + + for _, key := range storeKeys { + _, err = tx.Exec(flushRemovedStoreKeysStmt, reservedStoreKey, key, version) + if err != nil { + return fmt.Errorf("failed to exec SQL statement: %w", err) + } } if err := tx.Commit(); err != nil { diff --git a/store/v2/storage/storage_test_suite.go b/store/v2/storage/storage_test_suite.go index 42b24474aff2..8b29f18cbb79 100644 --- a/store/v2/storage/storage_test_suite.go +++ b/store/v2/storage/storage_test_suite.go @@ -665,20 +665,6 @@ func (s *StorageTestSuite) TestUpgradable() { } } - // prune store1 - err = ss.PruneStoreKey([]byte(storeKeys[0])) - s.Require().NoError(err) - // skip the test of RocksDB - if !slices.Contains(s.SkipTests, "TestUpgradable_Prune") { - for v := uint64(1); v <= uptoVersion; v++ { - for i := 0; i < keyCount; i++ { - bz, err := ss.Get([]byte(storeKeys[0]), v, []byte(fmt.Sprintf("key%03d", i))) - s.Require().NoError(err) - s.Require().Nil(bz) - } - } - } - // migrate store2 newStoreKey := "mstore2" err = ss.MigrateStoreKey([]byte(storeKeys[1]), []byte(newStoreKey)) @@ -691,6 +677,35 @@ func (s *StorageTestSuite) TestUpgradable() { s.Require().Equal([]byte(fmt.Sprintf("val%03d-%03d", i, v)), bz) } } + + // prune storekeys store1 + err = ss.PruneStoreKeys([]string{storeKeys[0]}, uptoVersion) + s.Require().NoError(err) + + removedStoreKeys := []string{storeKeys[0], storeKeys[2]} + // should be able to query before Prune for removed storeKeys + for _, storeKey := range removedStoreKeys { + for v := uint64(1); v <= uptoVersion; v++ { + for i := 0; i < keyCount; i++ { + bz, err := ss.Get([]byte(storeKey), v, []byte(fmt.Sprintf("key%03d", i))) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("val%03d-%03d", i, v)), bz) + } + } + } + s.Require().NoError(ss.Prune(uptoVersion)) + // should not be able to query after Prune + // skip the test of RocksDB + if !slices.Contains(s.SkipTests, "TestUpgradable_Prune") { + for _, storeKey := range removedStoreKeys { + for v := uint64(1); v <= uptoVersion; v++ { + for i := 0; i < keyCount; i++ { + _, err := ss.Get([]byte(storeKey), v, []byte(fmt.Sprintf("key%03d", i))) + s.Require().Error(err) + } + } + } + } } func DBApplyChangeset( diff --git a/store/v2/storage/store.go b/store/v2/storage/store.go index 0f9d66317cab..b9cc6162ad5b 100644 --- a/store/v2/storage/store.go +++ b/store/v2/storage/store.go @@ -139,15 +139,15 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat return nil } -// PruneStoreKey prunes the store key which implements the store.UpgradableDatabase +// PruneStoreKeys prunes the store keys which implements the store.UpgradableDatabase // interface. -func (ss *StorageStore) PruneStoreKey(storeKey []byte) error { +func (ss *StorageStore) PruneStoreKeys(storeKeys []string, version uint64) error { gdb, ok := ss.db.(store.UpgradableDatabase) if !ok { return errors.New("db does not implement UpgradableDatabase interface") } - return gdb.PruneStoreKey(storeKey) + return gdb.PruneStoreKeys(storeKeys, version) } // MigrateStoreKey migrates the store key which implements the store.UpgradableDatabase