Skip to content

Commit

Permalink
removing old store keys by pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope committed Jul 10, 2024
1 parent 2a99fe4 commit 4c098ac
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 159 deletions.
3 changes: 2 additions & 1 deletion store/v2/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (t *IavlTree) Get(version uint64, key []byte) ([]byte, error) {

// GetLatestVersion returns the latest version of the tree.
func (t *IavlTree) GetLatestVersion() (uint64, error) {
return uint64(t.tree.Version()), nil
v, err := t.tree.GetLatestVersion()
return uint64(v), err
}

// SetInitialVersion sets the initial version of the database.
Expand Down
73 changes: 71 additions & 2 deletions store/v2/commitment/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package commitment
import (
"bytes"
"fmt"
"slices"

corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2/internal/encoding"
"cosmossdk.io/store/v2/proof"
)

const (
commitInfoKeyFmt = "c/%d" // c/<version>
latestVersionKey = "c/latest"
commitInfoKeyFmt = "c/%d" // c/<version>
latestVersionKey = "c/latest"
removedStoreKeyPrefix = "c/removed/" // c/removed/<version>/<store-name>
)

type MetadataStore struct {
Expand Down Expand Up @@ -99,6 +101,73 @@ func (m *MetadataStore) flushCommitInfo(version uint64, cInfo *proof.CommitInfo)
return batch.Close()
}

func (m *MetadataStore) flushRemovedStoreKeys(version uint64, storeKeys []string) (err error) {
batch := m.kv.NewBatch()
defer func() {
err = batch.Close()
}()

for _, storeKey := range storeKeys {
key := []byte(fmt.Sprintf("%s%020d/%s", removedStoreKeyPrefix, version, storeKey))
if err := batch.Set(key, []byte{}); err != nil {
return err
}
}
return batch.WriteSync()
}

func (m *MetadataStore) getRemovedStoreKeys(version uint64) (storeKeys []string, err error) {
end := []byte(fmt.Sprintf("%s%020d/", removedStoreKeyPrefix, version+1))
iter, err := m.kv.Iterator([]byte(removedStoreKeyPrefix), end)
if err != nil {
return nil, err
}
defer func() {
if ierr := iter.Close(); ierr != nil {
err = ierr
}
}()

for ; iter.Valid(); iter.Next() {
storeKey := string(iter.Key()[len(end):])
storeKeys = append(storeKeys, storeKey)
}

return
}

func (m *MetadataStore) deleteRemovedStoreKeys(storeKeys []string, version uint64) (err error) {
batch := m.kv.NewBatch()
defer func() {
if berr := batch.Close(); berr != nil {
err = berr
}
}()

end := []byte(fmt.Sprintf("%s%020d/", removedStoreKeyPrefix, version+1))
iter, err := m.kv.Iterator([]byte(removedStoreKeyPrefix), end)
if err != nil {
return err
}
defer func() {
if ierr := iter.Close(); ierr != nil {
err = ierr
}
}()

for ; iter.Valid(); iter.Next() {
storeKey := string(iter.Key()[len(end):])
if !slices.Contains(storeKeys, storeKey) {
continue
}
if err := batch.Delete(iter.Key()); err != nil {
return nil
}
}

return batch.WriteSync()
}

func (m *MetadataStore) deleteCommitInfo(version uint64) error {
cInfoKey := []byte(fmt.Sprintf(commitInfoKeyFmt, version))
return m.kv.Delete(cInfoKey)
Expand Down
92 changes: 86 additions & 6 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math"
"sort"
"sync"

protoio "github.com/cosmos/gogoproto/io"

Expand Down Expand Up @@ -43,7 +44,7 @@ type CommitStore struct {
multiTrees map[string]Tree

mountTreeFn MountTreeFn
oldTrees map[string]Tree
oldTrees sync.Map
}

// NewCommitStore creates a new CommitStore instance.
Expand All @@ -53,7 +54,7 @@ func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, mountT
multiTrees: trees,
metadata: NewMetadataStore(db),
mountTreeFn: mountTreeFn,
oldTrees: make(map[string]Tree),
oldTrees: sync.Map{},
}, nil
}

Expand Down Expand Up @@ -132,12 +133,14 @@ func (c *CommitStore) LoadVersionAndUpgrade(targetVersion uint64, upgrades *core
}

newStoreKeys := make([]string, 0, len(c.multiTrees))
removedStoreKeys := make([]string, 0)
for _, storeKey := range storeKeys {
// If it has been deleted, remove the tree.
if upgrades.IsDeleted(storeKey) {
if err := removeTree(storeKey); err != nil {
return err
}
removedStoreKeys = append(removedStoreKeys, storeKey)
continue
}

Expand All @@ -155,10 +158,15 @@ func (c *CommitStore) LoadVersionAndUpgrade(targetVersion uint64, upgrades *core
if err := c.migrateKVStore(oldKey, storeKey); err != nil {
return err
}
removedStoreKeys = append(removedStoreKeys, oldKey)
}
newStoreKeys = append(newStoreKeys, storeKey)
}

if err := c.metadata.flushRemovedStoreKeys(targetVersion, removedStoreKeys); err != nil {
return err
}

return c.loadVersion(targetVersion, newStoreKeys)
}

Expand Down Expand Up @@ -322,13 +330,16 @@ func (c *CommitStore) GetProof(storeKey []byte, version uint64, key []byte) ([]p
if !ok {
// If the tree is not found, it means the store is an old store that has been
// deleted or renamed. We should use the old tree to get the proof.
if tree, ok = c.oldTrees[rawStoreKey]; !ok {
v, ok := c.oldTrees.Load(rawStoreKey)
if !ok {
var err error
tree, err = c.mountTreeFn(rawStoreKey)
if err != nil {
return nil, fmt.Errorf("store %s not found: %w", storeKey, err)
}
c.oldTrees[rawStoreKey] = tree
c.oldTrees.Store(rawStoreKey, tree)
} else {
tree = v.(Tree)
}
}

Expand Down Expand Up @@ -371,15 +382,84 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
for v := version; v > 0; v-- {
if err := c.metadata.deleteCommitInfo(v); err != nil {
return err
ferr = errors.Join(ferr, err)
}
}

// prune the trees
for _, tree := range c.multiTrees {
if err := tree.Prune(version); err != nil {
ferr = errors.Join(ferr, err)
}
}
// prune the removed store keys
if err := c.pruneRemovedStoreKeys(version); err != nil {
ferr = errors.Join(ferr, err)
}

return ferr
}

func (c *CommitStore) pruneRemovedStoreKeys(version uint64) (ferr error) {
removedStoreKeys, err := c.metadata.getRemovedStoreKeys(version)
if err != nil {
ferr = errors.Join(ferr, err)
}
for _, storeKey := range removedStoreKeys {
tree, err := c.mountTreeFn(storeKey)
if err != nil {
ferr = errors.Join(ferr, err)
continue
}

clearKVStore := func(kvStore corestore.KVStoreWithBatch) (err error) {
batch := kvStore.NewBatch()
iter, err := kvStore.Iterator(nil, nil)
if err != nil {
return err
}
defer func() {
err = iter.Close()
err = batch.Close()
}()

for ; iter.Valid(); iter.Next() {
if err := batch.Delete(iter.Key()); err != nil {
return err
}
bs, err := batch.GetByteSize()
if err != nil {
return err
}
if bs > batchFlushThreshold {
if err := batch.Write(); err != nil {
return err
}
if err := batch.Close(); err != nil {
return err
}
batch = kvStore.NewBatch()
}
}
if err := batch.Write(); err != nil {
return err
}
if err := batch.Close(); err != nil {
return err
}
return nil
}

if kvStoreGetter, ok := tree.(KVStoreGetter); ok {
kvStore := kvStoreGetter.GetKVStoreWithBatch()
if err := clearKVStore(kvStore); err != nil {
ferr = errors.Join(ferr, err)
}
}
}

if err := c.metadata.deleteRemovedStoreKeys(removedStoreKeys, version); err != nil {
ferr = errors.Join(ferr, err)
}

return ferr
}
Expand Down
13 changes: 13 additions & 0 deletions store/v2/commitment/store_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,17 @@ func (s *CommitStoreTestSuite) TestStore_Upgrades() {
s.Require().NotNil(proof)
}
}

// prune the old stores
s.Require().NoError(commitStore.Prune(latestVersion))

// GetProof should fail for the old stores
for _, storeKey := range []string{storeKey1, storeKey3} {
for i := uint64(1); i <= latestVersion; i++ {
for j := 0; j < kvCount; j++ {
_, err := commitStore.GetProof([]byte(storeKey), i, []byte(fmt.Sprintf("key-%d-%d", i, j)))
s.Require().Error(err)
}
}
}
}
4 changes: 2 additions & 2 deletions store/v2/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type VersionedDatabase interface {
// UpgradableDatabase defines an API for a versioned database that allows storeKey
// upgrades.
type UpgradableDatabase interface {
// PruneStoreKey prunes all data associated with the given storeKey.
PruneStoreKey(storeKey []byte) error
// PruneStoreKeys prunes all data associated with the given storeKeys.
PruneStoreKeys(storeKeys []string, version uint64) error
// MigrateStoreKey upgrades the storeKey from the old to the new storeKey.
MigrateStoreKey(oldStoreKey, newStoreKey []byte) error
}
Expand Down
2 changes: 1 addition & 1 deletion store/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
cosmossdk.io/log v1.3.1
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/gogoproto v1.5.0
github.com/cosmos/iavl v1.2.0
github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3
Expand Down
2 changes: 2 additions & 0 deletions store/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/cosmos/gogoproto v1.5.0 h1:SDVwzEqZDDBoslaeZg+dGE55hdzHfgUA40pEanMh52
github.com/cosmos/gogoproto v1.5.0/go.mod h1:iUM31aofn3ymidYG6bUR5ZFrk+Om8p5s754eMUcyp8I=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d h1:3+1Zpq1HG4pjwRKEGIEF1pcsH/hbx0LuHSBuzzX0kYo=
github.com/cosmos/iavl v1.2.1-0.20240709120915-7cbae69fd94d/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
54 changes: 0 additions & 54 deletions store/v2/pruning/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package pruning

import (
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)

const batchFlushThreshold = 1 << 16 // 64KB

// Manager is a struct that manages the pruning of old versions of the SC and SS.
type Manager struct {
// scPruner is the pruner for the SC.
Expand Down Expand Up @@ -72,54 +69,3 @@ func (m *Manager) SignalCommit(start bool, version uint64) error {

return nil
}

// PruneKVStores prunes KVStores which are needed to be pruned by upgrading the
// store key.
func (m *Manager) PruneKVStores(kvStores []corestore.KVStoreWithBatch) error {
for _, kvStore := range kvStores {
if kvStore == nil {
continue
}
if err := func() error {
batch := kvStore.NewBatch()
iter, err := kvStore.Iterator(nil, nil)
if err != nil {
return err
}
defer func() {
_ = iter.Close()
_ = batch.Close()
}()

for ; iter.Valid(); iter.Next() {
if err := batch.Delete(iter.Key()); err != nil {
return err
}
bs, err := batch.GetByteSize()
if err != nil {
return err
}
if bs > batchFlushThreshold {
if err := batch.Write(); err != nil {
return err
}
if err := batch.Close(); err != nil {
return err
}
batch = kvStore.NewBatch()
}
}
if err := batch.Write(); err != nil {
return err
}
if err := batch.Close(); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 4c098ac

Please sign in to comment.