-
Notifications
You must be signed in to change notification settings - Fork 2
/
dbfactory_tikv.go
126 lines (102 loc) · 3.35 KB
/
dbfactory_tikv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package shardchain
import (
"fmt"
"io"
"sync"
"github.com/PositionExchange/posichain/internal/tikv"
tikvCommon "github.com/PositionExchange/posichain/internal/tikv/common"
"github.com/PositionExchange/posichain/internal/tikv/prefix"
"github.com/PositionExchange/posichain/internal/tikv/remote"
"github.com/PositionExchange/posichain/internal/tikv/statedb_cache"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
)
const (
LDBTiKVPrefix = "posichain_tikv"
)
type TiKvCacheConfig struct {
StateDBCacheSizeInMB uint32
StateDBCachePersistencePath string
StateDBRedisServerAddr string
StateDBRedisLRUTimeInDay uint32
}
// TiKvFactory is a memory-backed blockchain database factory.
type TiKvFactory struct {
cacheDBMap sync.Map
PDAddr []string
Role string
CacheConfig statedb_cache.StateDBCacheConfig
}
// getStateDB create statedb storage use tikv
func (f *TiKvFactory) getRemoteDB(shardID uint32) (*prefix.PrefixDatabase, error) {
key := fmt.Sprintf("remote_%d_%s", shardID, f.Role)
if db, ok := f.cacheDBMap.Load(key); ok {
return db.(*prefix.PrefixDatabase), nil
} else {
prefixStr := []byte(fmt.Sprintf("%s_%d/", LDBTiKVPrefix, shardID))
remoteDatabase, err := remote.NewRemoteDatabase(f.PDAddr, f.Role == tikv.RoleReader)
if err != nil {
return nil, err
}
tmpDB := prefix.NewPrefixDatabase(prefixStr, remoteDatabase)
if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded {
_ = tmpDB.Close()
return loadedDB.(*prefix.PrefixDatabase), nil
}
return tmpDB, nil
}
}
// getStateDB create statedb storage with local memory cache.
func (f *TiKvFactory) getStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) {
key := fmt.Sprintf("state_db_%d_%s", shardID, f.Role)
if db, ok := f.cacheDBMap.Load(key); ok {
return db.(*statedb_cache.StateDBCacheDatabase), nil
} else {
db, err := f.getRemoteDB(shardID)
if err != nil {
return nil, err
}
tmpDB, err := statedb_cache.NewStateDBCacheDatabase(db, f.CacheConfig, f.Role == tikv.RoleReader)
if err != nil {
return nil, err
}
if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded {
_ = tmpDB.Close()
return loadedDB.(*statedb_cache.StateDBCacheDatabase), nil
}
return tmpDB, nil
}
}
// NewChainDB returns a new memDB for the blockchain for given shard.
func (f *TiKvFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
var database ethdb.KeyValueStore
db, err := f.getRemoteDB(shardID)
if err != nil {
return nil, err
}
database = tikvCommon.ToEthKeyValueStore(db)
return rawdb.NewDatabase(database), nil
}
// NewStateDB create shard tikv database
func (f *TiKvFactory) NewStateDB(shardID uint32) (ethdb.Database, error) {
var database ethdb.KeyValueStore
cacheDatabase, err := f.getStateDB(shardID)
if err != nil {
return nil, err
}
database = tikvCommon.ToEthKeyValueStore(cacheDatabase)
return rawdb.NewDatabase(database), nil
}
// NewCacheStateDB create shard statedb storage with memory cache
func (f *TiKvFactory) NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) {
return f.getStateDB(shardID)
}
// CloseAllDB close all tikv database
func (f *TiKvFactory) CloseAllDB() {
f.cacheDBMap.Range(func(_, value interface{}) bool {
if closer, ok := value.(io.Closer); ok {
_ = closer.Close()
}
return true
})
}