diff --git a/common/storage/storage.go b/common/storage/storage.go index 33ad3518..0d60c58a 100644 --- a/common/storage/storage.go +++ b/common/storage/storage.go @@ -93,6 +93,7 @@ const ( KeyPrefixContractValue = 103 // idPrefixContractValue ledger.go KeyPrefixVmLogs = 104 // vm_store.go KeyPrefixVMStorage = 105 // vm_store.go + KeyPrefixPendingBackup = 253 KeyPrefixGenericType = 254 KeyPrefixGenericTypeC = 255 ) diff --git a/ledger/ledger.go b/ledger/ledger.go index afe2c113..8b4796d9 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -99,7 +99,7 @@ var ( lock = sync.RWMutex{} ) -const version = 15 +const version = 16 func NewLedger(cfgFile string) *Ledger { lock.Lock() @@ -265,6 +265,7 @@ func (l *Ledger) upgrade() error { new(migration.MigrationV12ToV13), new(migration.MigrationV13ToV14), new(migration.MigrationV14ToV15), + new(migration.MigrationV15ToV16), } err = migration.Upgrade(ms, l.store) diff --git a/ledger/ledger_pending.go b/ledger/ledger_pending.go index 0e206295..5d292df0 100644 --- a/ledger/ledger_pending.go +++ b/ledger/ledger_pending.go @@ -81,10 +81,10 @@ func (l *Ledger) GetPendings(fn func(pendingKey *types.PendingKey, pendingInfo * } pendingInfo := new(types.PendingInfo) if err := pendingInfo.Deserialize(val); err != nil { - return fmt.Errorf("pendingKey deserialize: %s", err) + return fmt.Errorf("pendingInfo deserialize: %s", err) } if err := fn(pendingKey, pendingInfo); err != nil { - return fmt.Errorf("pendingKey deserialize: %s", err) + return fmt.Errorf("pending func: %s", err) } return nil }) diff --git a/ledger/migration/migration.go b/ledger/migration/migration.go index 5fd1ced9..bc1f438b 100644 --- a/ledger/migration/migration.go +++ b/ledger/migration/migration.go @@ -162,12 +162,12 @@ type MigrationV13ToV14 struct { func (m MigrationV13ToV14) Migrate(store storage.Store) error { return store.BatchWrite(false, func(batch storage.Batch) error { - fmt.Println("migrate ledger v13 to v14 ") b, err := checkVersion(m, store) if err != nil { return err } if b { + fmt.Println("migrate ledger v13 to v14 ") if err := store.Upgrade(m.StartVersion()); err != nil { return fmt.Errorf("migrate to %d error: %s", m.EndVersion(), err) } @@ -185,7 +185,7 @@ func (m MigrationV13ToV14) EndVersion() int { return 14 } -type contractKV struct { +type bytesKV struct { key []byte value []byte } @@ -195,13 +195,13 @@ type MigrationV14ToV15 struct { func (m MigrationV14ToV15) Migrate(store storage.Store) error { return store.BatchWrite(false, func(batch storage.Batch) error { - fmt.Println("migrate ledger v14 to v15 ") b, err := checkVersion(m, store) if err != nil { return err } if b { - cs := make([]contractKV, 0) + fmt.Println("migrate ledger v14 to v15 ") + cs := make([]bytesKV, 0) prefix, _ := storage.GetKeyOfParts(storage.KeyPrefixTrieVMStorage) if err := store.Iterator(prefix, nil, func(k, v []byte) error { //fmt.Println("==key ", k) @@ -209,7 +209,7 @@ func (m MigrationV14ToV15) Migrate(store storage.Store) error { copy(key, k) value := make([]byte, len(v)) copy(value, v) - c := contractKV{ + c := bytesKV{ key: key, value: value, } @@ -257,6 +257,114 @@ func (m MigrationV14ToV15) EndVersion() int { return 15 } +type pendingKV struct { + key *types.PendingKey + value []byte +} + +type MigrationV15ToV16 struct { +} + +func (m MigrationV15ToV16) Migrate(store storage.Store) error { + b, err := checkVersion(m, store) + if err != nil { + return err + } + if b { + fmt.Println("migrate ledger v15 to v16 ") + count := 0 + reset := false + bs := make([]bytesKV, 0) + pendingKvs := make([]*pendingKV, 0) + + // get all pending infos from db + prefix, _ := storage.GetKeyOfParts(storage.KeyPrefixPending) + err := store.Iterator(prefix, nil, func(k, v []byte) error { + key := make([]byte, len(k)) + copy(key, k) + value := make([]byte, len(v)) + copy(value, v) + c := bytesKV{ + key: key, + value: value, + } + bs = append(bs, c) + + pk := new(types.PendingKey) + if err := pk.Deserialize(key[1:]); err != nil { + if _, err := pk.UnmarshalMsg(key[1:]); err != nil { + return fmt.Errorf("pendingKey unmarshalMsg err: %s", err) + } + reset = true + } + pkv := &pendingKV{ + key: pk, + value: value, + } + pendingKvs = append(pendingKvs, pkv) + return nil + }) + if err != nil { + return fmt.Errorf("get pendings info err: %s", err) + } + + if reset { + // copy all pending infos to another table + err = store.BatchWrite(false, func(batch storage.Batch) error { + for _, b := range bs { + nk := make([]byte, 0) + nk = append(nk, storage.KeyPrefixPendingBackup) + nk = append(nk, b.key[1:]...) + if err := batch.Put(nk, b.value); err != nil { + return err + } + count++ + } + return nil + }) + if err != nil { + return fmt.Errorf("backup pending table error: %s", err) + } + + // reset pending table + if count != len(pendingKvs) { + return fmt.Errorf("pending count err: %d, %d", count, len(pendingKvs)) + } + return store.BatchWrite(false, func(batch storage.Batch) error { + for _, b := range bs { + if err := batch.Delete(b.key); err != nil { + return err + } + } + + for _, pkv := range pendingKvs { + pKey, err := storage.GetKeyOfParts(storage.KeyPrefixPending, pkv.key) + if err != nil { + return err + } + if err := batch.Put(pKey, pkv.value); err != nil { + return err + } + } + return updateVersion(m, batch) + }) + } else { + return store.BatchWrite(false, func(batch storage.Batch) error { + return updateVersion(m, batch) + }) + } + } + return nil +} + +func (m MigrationV15ToV16) StartVersion() int { + return 15 +} + +func (m MigrationV15ToV16) EndVersion() int { + return 16 +} + func checkVersion(m Migration, s storage.Store) (bool, error) { v, err := getVersion(s) if err != nil { diff --git a/ledger/migration/migration_test.go b/ledger/migration/migration_test.go index cd865fb6..f58273fa 100644 --- a/ledger/migration/migration_test.go +++ b/ledger/migration/migration_test.go @@ -2,6 +2,7 @@ package migration import ( "encoding/binary" + "math/big" "os" "path/filepath" "testing" @@ -72,7 +73,7 @@ func TestMigration_Migrate(t *testing.T) { if err := store.Put(frontierK, frontierV); err != nil { t.Fatal(err) } - migrations := []Migration{MigrationV11ToV12{}, MigrationV12ToV13{}, MigrationV13ToV14{}, MigrationV14ToV15{}} + migrations := []Migration{MigrationV11ToV12{}, MigrationV12ToV13{}, MigrationV13ToV14{}, MigrationV14ToV15{}, MigrationV15ToV16{}} if err := Upgrade(migrations, store); err != nil { t.Fatal(err) } @@ -119,3 +120,51 @@ func TestMigration_MigrateV14ToV15(t *testing.T) { } } + +func TestMigration_MigrateV15ToV16(t *testing.T) { + dir := filepath.Join(config.QlcTestDataDir(), "store", uuid.New().String()) + store, err := db.NewBadgerStore(dir) + if err != nil { + t.Fatal(err) + } + defer func() { + os.Remove(dir) + }() + + key := []byte{byte(storage.KeyPrefixVersion)} + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutVarint(buf, 15) + if err := store.Put(key, buf[:n]); err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + pendingKey := &types.PendingKey{ + Address: mock.Address(), + Hash: mock.Hash(), + } + pendingInfo := &types.PendingInfo{ + Source: mock.Address(), + Type: mock.Hash(), + Amount: types.Balance{Int: big.NewInt(100)}, + } + kBytes, err := pendingKey.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + pk := make([]byte, 0) + pk = append(pk, byte(storage.KeyPrefixPending)) + pk = append(pk, kBytes...) + iBytes, err := pendingInfo.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + if err := store.Put(pk, iBytes); err != nil { + t.Fatal(err) + } + } + migrations := []Migration{MigrationV15ToV16{}} + if err := Upgrade(migrations, store); err != nil { + t.Fatal(err) + } +} diff --git a/rpc/api/ledger_test.go b/rpc/api/ledger_test.go index 7728c7d9..ad68e2d2 100644 --- a/rpc/api/ledger_test.go +++ b/rpc/api/ledger_test.go @@ -261,8 +261,11 @@ func TestLedgerAPI_AccountBlocksCount(t *testing.T) { } func TestLedgerAPI_AccountHistoryTopn(t *testing.T) { - teardownTestCase, _, ledgerApi := setupDefaultLedgerAPI(t) + teardownTestCase, l, ledgerApi := setupDefaultLedgerAPI(t) defer teardownTestCase(t) + if err := l.Flush(); err != nil { + t.Fatal(err) + } r, err := ledgerApi.AccountHistoryTopn(account1.Address(), 100, nil) if err != nil { t.Fatal(err) diff --git a/rpc/grpc/apis/ledger_test.go b/rpc/grpc/apis/ledger_test.go index e4050722..adbf2494 100644 --- a/rpc/grpc/apis/ledger_test.go +++ b/rpc/grpc/apis/ledger_test.go @@ -140,8 +140,11 @@ func TestLedgerAPI_AccountBlocksCount(t *testing.T) { } func TestLedgerAPI_AccountHistoryTopn(t *testing.T) { - teardownTestCase, _, ledgerApi := setupDefaultLedgerAPI(t) + teardownTestCase, l, ledgerApi := setupDefaultLedgerAPI(t) defer teardownTestCase(t) + if err := l.Flush(); err != nil { + t.Fatal(err) + } r, err := ledgerApi.AccountHistoryTopn(context.Background(), &pb.AccountHistoryTopnReq{ Address: account1.Address().String(), Count: 100,