forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
358 lines (310 loc) · 12.9 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package ledgerstorage
import (
"sync"
"sync/atomic"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
)
const maxBlockFileSize = 64 * 1024 * 1024
var logger = flogging.MustGetLogger("ledgerstorage")
// Provider encapsulates two providers 1) block store provider and 2) and pvt data store provider
type Provider struct {
blkStoreProvider blkstorage.BlockStoreProvider
pvtdataStoreProvider pvtdatastorage.Provider
}
// Store encapsulates two stores 1) block store and pvt data store
type Store struct {
blkstorage.BlockStore
pvtdataStore pvtdatastorage.Store
rwlock sync.RWMutex
isPvtstoreAheadOfBlockstore atomic.Value
}
var attrsToIndex = []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
// NewProvider returns the handle to the provider
func NewProvider(blockStoreDir string, conf *pvtdatastorage.PrivateDataConfig, metricsProvider metrics.Provider) (*Provider, error) {
// Initialize the block storage
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStoreProvider, err := fsblkstorage.NewProvider(
fsblkstorage.NewConf(
blockStoreDir,
maxBlockFileSize,
),
indexConfig,
metricsProvider,
)
if err != nil {
return nil, err
}
pvtStoreProvider, err := pvtdatastorage.NewProvider(conf)
if err != nil {
return nil, err
}
return &Provider{blockStoreProvider, pvtStoreProvider}, nil
}
// Open opens the store
func (p *Provider) Open(ledgerid string) (*Store, error) {
var blockStore blkstorage.BlockStore
var pvtdataStore pvtdatastorage.Store
var err error
if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {
return nil, err
}
if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil {
return nil, err
}
store := &Store{
BlockStore: blockStore,
pvtdataStore: pvtdataStore,
}
if err := store.init(); err != nil {
return nil, err
}
info, err := blockStore.GetBlockchainInfo()
if err != nil {
return nil, err
}
pvtstoreHeight, err := pvtdataStore.LastCommittedBlockHeight()
if err != nil {
return nil, err
}
store.isPvtstoreAheadOfBlockstore.Store(pvtstoreHeight > info.Height)
return store, nil
}
// Close closes the provider
func (p *Provider) Close() {
p.blkStoreProvider.Close()
p.pvtdataStoreProvider.Close()
}
// Exists checks whether the ledgerID already presents
func (p *Provider) Exists(ledgerID string) (bool, error) {
return p.blkStoreProvider.Exists(ledgerID)
}
// Init initializes store with essential configurations
func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
s.pvtdataStore.Init(btlPolicy)
}
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
blockNum := blockAndPvtdata.Block.Header.Number
s.rwlock.Lock()
defer s.rwlock.Unlock()
pvtBlkStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight()
if err != nil {
return err
}
if pvtBlkStoreHt < blockNum+1 { // The pvt data store sanity check does not allow rewriting the pvt data.
// when re-processing blocks (rejoin the channel or re-fetching last few block),
// skip the pvt data commit to the pvtdata blockstore
logger.Debugf("Writing block [%d] to pvt block store", blockNum)
// If a state fork occurs during a regular block commit,
// we have a mechanism to drop all blocks followed by refetching of blocks
// and re-processing them. In the current way of doing this, we only drop
// the block files (and related artifacts) but we do not drop/overwrite the
// pvtdatastorage as it might leads to data loss.
// During block reprocessing, as there is a possibility of an invalid pvtdata
// transaction to become valid, we store the pvtdata of invalid transactions
// too in the pvtdataStore as we do for the publicdata in the case of blockStore.
pvtData, missingPvtData := constructPvtDataAndMissingData(blockAndPvtdata)
if err := s.pvtdataStore.Commit(blockAndPvtdata.Block.Header.Number, pvtData, missingPvtData); err != nil {
return err
}
} else {
logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt)
}
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
return err
}
if pvtBlkStoreHt == blockNum+1 {
// we reach here only when the pvtdataStore was ahead
// of blockStore during the store opening time (would
// occur after a peer rollback/reset).
s.isPvtstoreAheadOfBlockstore.Store(false)
}
return nil
}
func constructPvtDataAndMissingData(blockAndPvtData *ledger.BlockAndPvtData) ([]*ledger.TxPvtData,
ledger.TxMissingPvtDataMap) {
var pvtData []*ledger.TxPvtData
missingPvtData := make(ledger.TxMissingPvtDataMap)
numTxs := uint64(len(blockAndPvtData.Block.Data.Data))
// for all tx, construct pvtdata and missing pvtdata list
for txNum := uint64(0); txNum < numTxs; txNum++ {
if pvtdata, ok := blockAndPvtData.PvtData[txNum]; ok {
pvtData = append(pvtData, pvtdata)
}
if missingData, ok := blockAndPvtData.MissingPvtData[txNum]; ok {
for _, missing := range missingData {
missingPvtData.Add(txNum, missing.Namespace,
missing.Collection, missing.IsEligible)
}
}
}
return pvtData, missingPvtData
}
// CommitPvtDataOfOldBlocks commits the pvtData of old blocks
func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
err := s.pvtdataStore.CommitPvtDataOfOldBlocks(blocksPvtData)
if err != nil {
return err
}
return nil
}
// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.
// The pvt data is filtered by the list of 'collections' supplied
func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
var block *common.Block
var pvtdata []*ledger.TxPvtData
var err error
if block, err = s.RetrieveBlockByNumber(blockNum); err != nil {
return nil, err
}
if pvtdata, err = s.getPvtDataByNumWithoutLock(blockNum, filter); err != nil {
return nil, err
}
return &ledger.BlockAndPvtData{Block: block, PvtData: constructPvtdataMap(pvtdata)}, nil
}
// GetPvtDataByNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
// A nil filter does not filter any results
func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
return s.getPvtDataByNumWithoutLock(blockNum, filter)
}
// getPvtDataByNumWithoutLock returns only the pvt data corresponding to the given block number.
// This function does not acquire a readlock and it is expected that in most of the circumstances, the caller
// possesses a read lock on `s.rwlock`
func (s *Store) getPvtDataByNumWithoutLock(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
var pvtdata []*ledger.TxPvtData
var err error
if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil {
return nil, err
}
return pvtdata, nil
}
// DoesPvtDataInfoExist returns true when
// (1) the ledger has pvtdata associated with the given block number (or)
// (2) a few or all pvtdata associated with the given block number is missing but the
// missing info is recorded in the ledger (or)
// (3) the block is committed does not contain any pvtData.
func (s *Store) DoesPvtDataInfoExist(blockNum uint64) (bool, error) {
pvtStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight()
if err != nil {
return false, err
}
return blockNum+1 <= pvtStoreHt, nil
}
// GetMissingPvtDataInfoForMostRecentBlocks invokes the function on underlying pvtdata store
func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) {
// it is safe to not acquire a read lock on s.rwlock. Without a lock, the value of
// lastCommittedBlock can change due to a new block commit. As a result, we may not
// be able to fetch the missing data info of truly the most recent blocks. This
// decision was made to ensure that the regular block commit rate is not affected.
return s.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock)
}
// ProcessCollsEligibilityEnabled invokes the function on underlying pvtdata store
func (s *Store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap map[string][]string) error {
return s.pvtdataStore.ProcessCollsEligibilityEnabled(committingBlk, nsCollMap)
}
// GetLastUpdatedOldBlocksPvtData invokes the function on underlying pvtdata store
func (s *Store) GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error) {
return s.pvtdataStore.GetLastUpdatedOldBlocksPvtData()
}
// ResetLastUpdatedOldBlocksList invokes the function on underlying pvtdata store
func (s *Store) ResetLastUpdatedOldBlocksList() error {
return s.pvtdataStore.ResetLastUpdatedOldBlocksList()
}
// IsPvtStoreAheadOfBlockStore returns true when the pvtStore height is
// greater than the blockstore height. Otherwise, it returns false.
func (s *Store) IsPvtStoreAheadOfBlockStore() bool {
return s.isPvtstoreAheadOfBlockstore.Load().(bool)
}
// TODO: FAB-16297 -- Remove init() as it is no longer needed. The private data feature
// became stable from v1.2 onwards. To allow the initiation of pvtdata store with non-zero
// block height (mainly during a rolling upgrade from an existing v1.1 network to v1.2),
// we introduced pvtdata init() function which would take the height of block store and
// set it as a height of pvtdataStore. From v2.0 onwards, it is no longer needed as we do
// not support a rolling upgrade from v1.1 to v2.0
// init first invokes function `initFromExistingBlockchain`
// in order to check whether the pvtdata store is present because of an upgrade
// of peer from 1.0 and need to be updated with the existing blockchain. If, this is
// not the case then this init will invoke function `syncPvtdataStoreWithBlockStore`
// to follow the normal course
func (s *Store) init() error {
var initialized bool
var err error
if initialized, err = s.initPvtdataStoreFromExistingBlockchain(); err != nil || initialized {
return err
}
return nil
}
// initPvtdataStoreFromExistingBlockchain updates the initial state of the pvtdata store
// if an existing block store has a blockchain and the pvtdata store is empty.
// This situation is expected to happen when a peer is upgrated from version 1.0
// and an existing blockchain is present that was generated with version 1.0.
// Under this scenario, the pvtdata store is brought upto the point as if it has
// processed existing blocks with no pvt data. This function returns true if the
// above mentioned condition is found to be true and pvtdata store is successfully updated
func (s *Store) initPvtdataStoreFromExistingBlockchain() (bool, error) {
var bcInfo *common.BlockchainInfo
var pvtdataStoreEmpty bool
var err error
if bcInfo, err = s.BlockStore.GetBlockchainInfo(); err != nil {
return false, err
}
if pvtdataStoreEmpty, err = s.pvtdataStore.IsEmpty(); err != nil {
return false, err
}
if pvtdataStoreEmpty && bcInfo.Height > 0 {
if err = s.pvtdataStore.InitLastCommittedBlock(bcInfo.Height - 1); err != nil {
return false, err
}
return true, nil
}
return false, nil
}
func constructPvtdataMap(pvtdata []*ledger.TxPvtData) ledger.TxPvtDataMap {
if pvtdata == nil {
return nil
}
m := make(map[uint64]*ledger.TxPvtData)
for _, pvtdatum := range pvtdata {
m[pvtdatum.SeqInBlock] = pvtdatum
}
return m
}
// LoadPreResetHeight returns the pre reset height for the specified ledgers.
func LoadPreResetHeight(blockstorePath string, ledgerIDs []string) (map[string]uint64, error) {
return fsblkstorage.LoadPreResetHeight(blockstorePath, ledgerIDs)
}
// ResetBlockStore resets all ledgers to the genesis block.
func ResetBlockStore(blockstorePath string) error {
return fsblkstorage.ResetBlockStore(blockstorePath)
}
// ValidateRollbackParams performs necessary validation on the input given for
// the rollback operation.
func ValidateRollbackParams(blockstorePath, ledgerID string, blockNum uint64) error {
return fsblkstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum)
}
// Rollback reverts changes made to the block store beyond a given block number.
func Rollback(blockstorePath, ledgerID string, blockNum uint64) error {
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
return fsblkstorage.Rollback(blockstorePath, ledgerID, blockNum, indexConfig)
}