-
Notifications
You must be signed in to change notification settings - Fork 199
/
chainStorer.go
184 lines (147 loc) · 4.48 KB
/
chainStorer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package dataRetriever
import (
"fmt"
"sync"
"github.com/ElrondNetwork/elrond-go/storage"
)
var _ StorageService = (*ChainStorer)(nil)
// ChainStorer is a StorageService implementation that can hold multiple storages
// grouped by storage unit type
type ChainStorer struct {
lock sync.RWMutex
chain map[UnitType]storage.Storer
}
// NewChainStorer returns a new initialised ChainStorer
func NewChainStorer() *ChainStorer {
return &ChainStorer{
chain: make(map[UnitType]storage.Storer),
}
}
// AddStorer will add a new storer to the chain map
func (bc *ChainStorer) AddStorer(key UnitType, s storage.Storer) {
bc.lock.Lock()
bc.chain[key] = s
bc.lock.Unlock()
}
// GetStorer returns the storer from the chain map or nil if the storer was not found with error
func (bc *ChainStorer) GetStorer(unitType UnitType) (storage.Storer, error) {
bc.lock.RLock()
storer, ok := bc.chain[unitType]
bc.lock.RUnlock()
if !ok {
return nil, fmt.Errorf("%s %w", unitType.String(), ErrStorerNotFound)
}
return storer, nil
}
// Has returns true if the key is found in the selected Unit or false otherwise
// It can return an error if the provided unit type is not supported or if the
// underlying implementation of the storage unit reports an error.
func (bc *ChainStorer) Has(unitType UnitType, key []byte) error {
bc.lock.RLock()
storer := bc.chain[unitType]
bc.lock.RUnlock()
if storer == nil {
return ErrNoSuchStorageUnit
}
return storer.Has(key)
}
// Get returns the value for the given key if found in the selected storage unit,
// nil otherwise. It can return an error if the provided unit type is not supported
// or if the storage unit underlying implementation reports an error
func (bc *ChainStorer) Get(unitType UnitType, key []byte) ([]byte, error) {
bc.lock.RLock()
storer := bc.chain[unitType]
bc.lock.RUnlock()
if storer == nil {
return nil, ErrNoSuchStorageUnit
}
return storer.Get(key)
}
// Put stores the key, value pair in the selected storage unit
// It can return an error if the provided unit type is not supported
// or if the storage unit underlying implementation reports an error
func (bc *ChainStorer) Put(unitType UnitType, key []byte, value []byte) error {
bc.lock.RLock()
storer := bc.chain[unitType]
bc.lock.RUnlock()
if storer == nil {
return fmt.Errorf("%w for unit type %s", ErrNoSuchStorageUnit, unitType.String())
}
return storer.Put(key, value)
}
// SetEpochForPutOperation will set the epoch to be used in all persisters for the put operation
func (bc *ChainStorer) SetEpochForPutOperation(epoch uint32) {
bc.lock.Lock()
for _, storer := range bc.chain {
storerWithPutInEpoch, ok := storer.(storage.StorerWithPutInEpoch)
if !ok {
continue
}
storerWithPutInEpoch.SetEpochForPutOperation(epoch)
}
bc.lock.Unlock()
}
// GetAll gets all the elements with keys in the keys array, from the selected storage unit
// It can report an error if the provided unit type is not supported, if there is a missing
// key in the unit, or if the underlying implementation of the storage unit reports an error.
func (bc *ChainStorer) GetAll(unitType UnitType, keys [][]byte) (map[string][]byte, error) {
bc.lock.RLock()
storer := bc.chain[unitType]
bc.lock.RUnlock()
if storer == nil {
return nil, ErrNoSuchStorageUnit
}
m := map[string][]byte{}
for _, key := range keys {
val, err := storer.Get(key)
if err != nil {
return nil, err
}
m[string(key)] = val
}
return m, nil
}
// GetAllStorers returns all the available storers
func (bc *ChainStorer) GetAllStorers() map[UnitType]storage.Storer {
bc.lock.RLock()
chainMapCopy := make(map[UnitType]storage.Storer, len(bc.chain))
for key, value := range bc.chain {
chainMapCopy[key] = value
}
bc.lock.RUnlock()
return chainMapCopy
}
// Destroy removes the underlying files/resources used by the storage service
func (bc *ChainStorer) Destroy() error {
bc.lock.Lock()
defer bc.lock.Unlock()
var err error
for _, v := range bc.chain {
err = v.DestroyUnit()
if err != nil {
return err
}
}
bc.chain = nil
return nil
}
// CloseAll will close all the active units
func (bc *ChainStorer) CloseAll() error {
bc.lock.Lock()
defer bc.lock.Unlock()
closedSuccessfully := true
for _, unit := range bc.chain {
err := unit.Close()
if err != nil {
closedSuccessfully = false
}
}
if closedSuccessfully {
return nil
}
return storage.ErrClosingPersisters
}
// IsInterfaceNil returns true if there is no value under the interface
func (bc *ChainStorer) IsInterfaceNil() bool {
return bc == nil
}