Skip to content

Commit

Permalink
add batches (#564)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangkai <zhangkai.gis@163.com>
  • Loading branch information
tac0turtle and giskook committed Sep 16, 2022
1 parent e25a061 commit 1244f6b
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 62 deletions.
82 changes: 27 additions & 55 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"bytes"
"crypto/sha256"
"fmt"
"runtime"
"sort"
"sync"
"time"

"github.com/pkg/errors"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/iavl/internal/logger"
)

// commitGap after upgrade/delete commitGap FastNodes when commit the batch
var commitGap uint64 = 5000000

// ErrVersionDoesNotExist is returned if a requested version does not exist.
var ErrVersionDoesNotExist = errors.New("version does not exist")

Expand Down Expand Up @@ -627,12 +628,6 @@ func (tree *MutableTree) IsUpgradeable() (bool, error) {
// from latest tree.
// nolint: unparam
func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error) {
shouldForceUpdate, err := tree.ndb.shouldForceFastStorageUpgrade()
if err != nil {
return false, err
}
isFastStorageEnabled := tree.ndb.hasUpgradedToFastStorage()

isUpgradeable, err := tree.IsUpgradeable()
if err != nil {
return false, err
Expand All @@ -642,22 +637,29 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
return false, nil
}

if isFastStorageEnabled && shouldForceUpdate {
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
// be worth to delete the fast nodes from disk.
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
defer fastItr.Close()
for ; fastItr.Valid(); fastItr.Next() {
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
// be worth to delete the fast nodes from disk.
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
defer fastItr.Close()
var deletedFastNodes uint64
for ; fastItr.Valid(); fastItr.Next() {
deletedFastNodes++
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
return false, err
}
if deletedFastNodes%commitGap == 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}
}

// Force garbage collection before we proceed to enabling fast storage.
runtime.GC()
if deletedFastNodes%commitGap != 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}

if err := tree.enableFastStorageAndCommit(); err != nil {
tree.ndb.storageVersion = defaultStorageVersionValue
Expand All @@ -675,47 +677,17 @@ func (tree *MutableTree) enableFastStorageAndCommitLocked() error {
func (tree *MutableTree) enableFastStorageAndCommit() error {
var err error

// We start a new thread to keep on checking if we are above 4GB, and if so garbage collect.
// This thread only lasts during the fast node migration.
// This is done to keep RAM usage down.
done := make(chan struct{})
defer func() {
done <- struct{}{}
close(done)
}()

go func() {
timer := time.NewTimer(time.Second)
var m runtime.MemStats

for {
// Sample the current memory usage
runtime.ReadMemStats(&m)

if m.Alloc > 4*1024*1024*1024 {
// If we are using more than 4GB of memory, we should trigger garbage collection
// to free up some memory.
runtime.GC()
}

select {
case <-timer.C:
timer.Reset(time.Second)
case <-done:
if !timer.Stop() {
<-timer.C
}
return
}
}
}()

itr := NewIterator(nil, nil, true, tree.ImmutableTree)
defer itr.Close()
var upgradedFastNodes uint64
for ; itr.Valid(); itr.Next() {
upgradedFastNodes++
if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
return err
}
if upgradedFastNodes%commitGap == 0 {
tree.ndb.Commit()
}
}

if err = itr.Error(); err != nil {
Expand Down
116 changes: 109 additions & 7 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,12 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

iterMock := mock.NewMockIterator(ctrl)
dbMock.EXPECT().Iterator(gomock.Any(), gomock.Any()).Return(iterMock, nil)
iterMock.EXPECT().Error()
iterMock.EXPECT().Valid().Times(2)
iterMock.EXPECT().Close()

batchMock.EXPECT().Set(gomock.Any(), gomock.Any()).Return(expectedError).Times(1)

tree, err := NewMutableTree(dbMock, 0)
Expand Down Expand Up @@ -943,7 +949,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -964,8 +970,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
updatedExpectedStorageVersion[len(updatedExpectedStorageVersion)-1]++
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(1)
batchMock.EXPECT().Close().Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
batchMock.EXPECT().Close().Return(nil).Times(2)

// iterMock is used to mock the underlying db iterator behing fast iterator
// Here, we want to mock the behavior of deleting fast nodes from disk when
Expand Down Expand Up @@ -1022,7 +1028,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testing.T) {
// Setup
tree, mirror := setupTreeAndMirrorForUpgrade(t)
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)

isFastCacheEnabled, err := tree.IsFastCacheEnabled()
require.NoError(t, err)
Expand Down Expand Up @@ -1089,7 +1095,7 @@ func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testi

func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T) {
// Setup
tree, mirror := setupTreeAndMirrorForUpgrade(t)
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)

isFastCacheEnabled, err := tree.IsFastCacheEnabled()
require.NoError(t, err)
Expand Down Expand Up @@ -1148,12 +1154,108 @@ func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T)
})
}

func setupTreeAndMirrorForUpgrade(t *testing.T) (*MutableTree, [][]string) {
func TestUpgradeStorageToFast_Success(t *testing.T) {
tmpCommitGap := commitGap
commitGap = 1000
defer func() {
commitGap = tmpCommitGap
}()

type fields struct {
nodeCount int
}
tests := []struct {
name string
fields fields
}{
{"less than commit gap", fields{nodeCount: 100}},
{"equal to commit gap", fields{nodeCount: int(commitGap)}},
{"great than commit gap", fields{nodeCount: int(commitGap) + 100}},
{"two times commit gap", fields{nodeCount: int(commitGap) * 2}},
{"two times plus commit gap", fields{nodeCount: int(commitGap)*2 + 1}},
}

for _, tt := range tests {
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
require.Nil(t, err)
require.True(t, enabled)
t.Run(tt.name, func(t *testing.T) {
i := 0
iter := NewFastIterator(nil, nil, true, tree.ndb)
for ; iter.Valid(); iter.Next() {
require.Equal(t, []byte(mirror[i][0]), iter.Key())
require.Equal(t, []byte(mirror[i][1]), iter.Value())
i++
}
require.Equal(t, len(mirror), i)
})
}
}

func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
// we delete fast node, in case of deadlock. we should limit the stale count lower than chBufferSize(64)
tmpCommitGap := commitGap
commitGap = 5
defer func() {
commitGap = tmpCommitGap
}()

valStale := "val_stale"
addStaleKey := func(ndb *nodeDB, staleCount int) {
var keyPrefix = "key"
for i := 0; i < staleCount; i++ {
key := fmt.Sprintf("%s_%d", keyPrefix, i)

node := NewFastNode([]byte(key), []byte(valStale), 100)
var buf bytes.Buffer
buf.Grow(node.encodedSize())
err := node.writeBytes(&buf)
require.NoError(t, err)
err = ndb.db.Set(ndb.fastNodeKey([]byte(key)), buf.Bytes())
require.NoError(t, err)
}
}
type fields struct {
nodeCount int
staleCount int
}

tests := []struct {
name string
fields fields
}{
{"stale less than commit gap", fields{nodeCount: 100, staleCount: 4}},
{"stale equal to commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)}},
{"stale great than commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap)*2 - 1}},
{"stale twice commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap) * 2}},
{"stale great than twice commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)*2 + 1}},
}

for _, tt := range tests {
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
addStaleKey(tree.ndb, tt.fields.staleCount)
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
require.Nil(t, err)
require.True(t, enabled)
t.Run(tt.name, func(t *testing.T) {
i := 0
iter := NewFastIterator(nil, nil, true, tree.ndb)
for ; iter.Valid(); iter.Next() {
require.Equal(t, []byte(mirror[i][0]), iter.Key())
require.Equal(t, []byte(mirror[i][1]), iter.Value())
i++
}
require.Equal(t, len(mirror), i)
})
}
}

func setupTreeAndMirrorForUpgrade(t *testing.T, numEntries int) (*MutableTree, [][]string) {
db := db.NewMemDB()

tree, _ := NewMutableTree(db, 0)

const numEntries = 100
var keyPrefix, valPrefix = "key", "val"

mirror := make([][]string, 0, numEntries)
Expand Down

0 comments on commit 1244f6b

Please sign in to comment.