Skip to content

Commit

Permalink
fix: upgrade ledger to reset pending infos (#1199)
Browse files Browse the repository at this point in the history
* fix: migration ledger to reset pendings

* feat: add testcase
  • Loading branch information
zengchen221 committed Jul 24, 2020
1 parent 70be3bf commit 389227a
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 11 deletions.
1 change: 1 addition & 0 deletions common/storage/storage.go
Expand Up @@ -93,6 +93,7 @@ const (
KeyPrefixContractValue = 103 // idPrefixContractValue ledger.go
KeyPrefixVmLogs = 104 // vm_store.go
KeyPrefixVMStorage = 105 // vm_store.go
KeyPrefixPendingBackup = 253
KeyPrefixGenericType = 254
KeyPrefixGenericTypeC = 255
)
Expand Down
3 changes: 2 additions & 1 deletion ledger/ledger.go
Expand Up @@ -99,7 +99,7 @@ var (
lock = sync.RWMutex{}
)

const version = 15
const version = 16

func NewLedger(cfgFile string) *Ledger {
lock.Lock()
Expand Down Expand Up @@ -265,6 +265,7 @@ func (l *Ledger) upgrade() error {
new(migration.MigrationV12ToV13),
new(migration.MigrationV13ToV14),
new(migration.MigrationV14ToV15),
new(migration.MigrationV15ToV16),
}

err = migration.Upgrade(ms, l.store)
Expand Down
4 changes: 2 additions & 2 deletions ledger/ledger_pending.go
Expand Up @@ -81,10 +81,10 @@ func (l *Ledger) GetPendings(fn func(pendingKey *types.PendingKey, pendingInfo *
}
pendingInfo := new(types.PendingInfo)
if err := pendingInfo.Deserialize(val); err != nil {
return fmt.Errorf("pendingKey deserialize: %s", err)
return fmt.Errorf("pendingInfo deserialize: %s", err)
}
if err := fn(pendingKey, pendingInfo); err != nil {
return fmt.Errorf("pendingKey deserialize: %s", err)
return fmt.Errorf("pending func: %s", err)
}
return nil
})
Expand Down
118 changes: 113 additions & 5 deletions ledger/migration/migration.go
Expand Up @@ -162,12 +162,12 @@ type MigrationV13ToV14 struct {

func (m MigrationV13ToV14) Migrate(store storage.Store) error {
return store.BatchWrite(false, func(batch storage.Batch) error {
fmt.Println("migrate ledger v13 to v14 ")
b, err := checkVersion(m, store)
if err != nil {
return err
}
if b {
fmt.Println("migrate ledger v13 to v14 ")
if err := store.Upgrade(m.StartVersion()); err != nil {
return fmt.Errorf("migrate to %d error: %s", m.EndVersion(), err)
}
Expand All @@ -185,7 +185,7 @@ func (m MigrationV13ToV14) EndVersion() int {
return 14
}

type contractKV struct {
type bytesKV struct {
key []byte
value []byte
}
Expand All @@ -195,21 +195,21 @@ type MigrationV14ToV15 struct {

func (m MigrationV14ToV15) Migrate(store storage.Store) error {
return store.BatchWrite(false, func(batch storage.Batch) error {
fmt.Println("migrate ledger v14 to v15 ")
b, err := checkVersion(m, store)
if err != nil {
return err
}
if b {
cs := make([]contractKV, 0)
fmt.Println("migrate ledger v14 to v15 ")
cs := make([]bytesKV, 0)
prefix, _ := storage.GetKeyOfParts(storage.KeyPrefixTrieVMStorage)
if err := store.Iterator(prefix, nil, func(k, v []byte) error {
//fmt.Println("==key ", k)
key := make([]byte, len(k))
copy(key, k)
value := make([]byte, len(v))
copy(value, v)
c := contractKV{
c := bytesKV{
key: key,
value: value,
}
Expand Down Expand Up @@ -257,6 +257,114 @@ func (m MigrationV14ToV15) EndVersion() int {
return 15
}

type pendingKV struct {
key *types.PendingKey
value []byte
}

type MigrationV15ToV16 struct {
}

func (m MigrationV15ToV16) Migrate(store storage.Store) error {
b, err := checkVersion(m, store)
if err != nil {
return err
}
if b {
fmt.Println("migrate ledger v15 to v16 ")
count := 0
reset := false
bs := make([]bytesKV, 0)
pendingKvs := make([]*pendingKV, 0)

// get all pending infos from db
prefix, _ := storage.GetKeyOfParts(storage.KeyPrefixPending)
err := store.Iterator(prefix, nil, func(k, v []byte) error {
key := make([]byte, len(k))
copy(key, k)
value := make([]byte, len(v))
copy(value, v)
c := bytesKV{
key: key,
value: value,
}
bs = append(bs, c)

pk := new(types.PendingKey)
if err := pk.Deserialize(key[1:]); err != nil {
if _, err := pk.UnmarshalMsg(key[1:]); err != nil {
return fmt.Errorf("pendingKey unmarshalMsg err: %s", err)
}
reset = true
}
pkv := &pendingKV{
key: pk,
value: value,
}
pendingKvs = append(pendingKvs, pkv)
return nil
})
if err != nil {
return fmt.Errorf("get pendings info err: %s", err)
}

if reset {
// copy all pending infos to another table
err = store.BatchWrite(false, func(batch storage.Batch) error {
for _, b := range bs {
nk := make([]byte, 0)
nk = append(nk, storage.KeyPrefixPendingBackup)
nk = append(nk, b.key[1:]...)
if err := batch.Put(nk, b.value); err != nil {
return err
}
count++
}
return nil
})
if err != nil {
return fmt.Errorf("backup pending table error: %s", err)
}

// reset pending table
if count != len(pendingKvs) {
return fmt.Errorf("pending count err: %d, %d", count, len(pendingKvs))
}
return store.BatchWrite(false, func(batch storage.Batch) error {
for _, b := range bs {
if err := batch.Delete(b.key); err != nil {
return err
}
}

for _, pkv := range pendingKvs {
pKey, err := storage.GetKeyOfParts(storage.KeyPrefixPending, pkv.key)
if err != nil {
return err
}
if err := batch.Put(pKey, pkv.value); err != nil {
return err
}
}
return updateVersion(m, batch)
})
} else {
return store.BatchWrite(false, func(batch storage.Batch) error {
return updateVersion(m, batch)
})
}
}
return nil
}

func (m MigrationV15ToV16) StartVersion() int {
return 15
}

func (m MigrationV15ToV16) EndVersion() int {
return 16
}

func checkVersion(m Migration, s storage.Store) (bool, error) {
v, err := getVersion(s)
if err != nil {
Expand Down
51 changes: 50 additions & 1 deletion ledger/migration/migration_test.go
Expand Up @@ -2,6 +2,7 @@ package migration

import (
"encoding/binary"
"math/big"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestMigration_Migrate(t *testing.T) {
if err := store.Put(frontierK, frontierV); err != nil {
t.Fatal(err)
}
migrations := []Migration{MigrationV11ToV12{}, MigrationV12ToV13{}, MigrationV13ToV14{}, MigrationV14ToV15{}}
migrations := []Migration{MigrationV11ToV12{}, MigrationV12ToV13{}, MigrationV13ToV14{}, MigrationV14ToV15{}, MigrationV15ToV16{}}
if err := Upgrade(migrations, store); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -119,3 +120,51 @@ func TestMigration_MigrateV14ToV15(t *testing.T) {
}

}

func TestMigration_MigrateV15ToV16(t *testing.T) {
dir := filepath.Join(config.QlcTestDataDir(), "store", uuid.New().String())
store, err := db.NewBadgerStore(dir)
if err != nil {
t.Fatal(err)
}
defer func() {
os.Remove(dir)
}()

key := []byte{byte(storage.KeyPrefixVersion)}
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, 15)
if err := store.Put(key, buf[:n]); err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
pendingKey := &types.PendingKey{
Address: mock.Address(),
Hash: mock.Hash(),
}
pendingInfo := &types.PendingInfo{
Source: mock.Address(),
Type: mock.Hash(),
Amount: types.Balance{Int: big.NewInt(100)},
}
kBytes, err := pendingKey.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
pk := make([]byte, 0)
pk = append(pk, byte(storage.KeyPrefixPending))
pk = append(pk, kBytes...)
iBytes, err := pendingInfo.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
if err := store.Put(pk, iBytes); err != nil {
t.Fatal(err)
}
}
migrations := []Migration{MigrationV15ToV16{}}
if err := Upgrade(migrations, store); err != nil {
t.Fatal(err)
}
}
5 changes: 4 additions & 1 deletion rpc/api/ledger_test.go
Expand Up @@ -261,8 +261,11 @@ func TestLedgerAPI_AccountBlocksCount(t *testing.T) {
}

func TestLedgerAPI_AccountHistoryTopn(t *testing.T) {
teardownTestCase, _, ledgerApi := setupDefaultLedgerAPI(t)
teardownTestCase, l, ledgerApi := setupDefaultLedgerAPI(t)
defer teardownTestCase(t)
if err := l.Flush(); err != nil {
t.Fatal(err)
}
r, err := ledgerApi.AccountHistoryTopn(account1.Address(), 100, nil)
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion rpc/grpc/apis/ledger_test.go
Expand Up @@ -140,8 +140,11 @@ func TestLedgerAPI_AccountBlocksCount(t *testing.T) {
}

func TestLedgerAPI_AccountHistoryTopn(t *testing.T) {
teardownTestCase, _, ledgerApi := setupDefaultLedgerAPI(t)
teardownTestCase, l, ledgerApi := setupDefaultLedgerAPI(t)
defer teardownTestCase(t)
if err := l.Flush(); err != nil {
t.Fatal(err)
}
r, err := ledgerApi.AccountHistoryTopn(context.Background(), &pb.AccountHistoryTopnReq{
Address: account1.Address().String(),
Count: 100,
Expand Down

0 comments on commit 389227a

Please sign in to comment.