forked from hyperledger/fabric
/
lockbased_txmgr.go
168 lines (148 loc) · 5.9 KB
/
lockbased_txmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package lockbasedtxmgr
import (
"sync"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/common"
)
var logger = flogging.MustGetLogger("lockbasedtxmgr")
// LockBasedTxMgr a simple implementation of interface `txmgmt.TxMgr`.
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
type LockBasedTxMgr struct {
ledgerid string
db privacyenabledstate.DB
validator validator.Validator
batch *privacyenabledstate.UpdateBatch
currentBlock *common.Block
stateListeners ledger.StateListeners
commitRWLock sync.RWMutex
}
// NewLockBasedTxMgr constructs a new instance of NewLockBasedTxMgr
func NewLockBasedTxMgr(ledgerid string, db privacyenabledstate.DB, stateListeners ledger.StateListeners) *LockBasedTxMgr {
db.Open()
txmgr := &LockBasedTxMgr{ledgerid: ledgerid, db: db, stateListeners: stateListeners}
txmgr.validator = valimpl.NewStatebasedValidator(txmgr, db)
return txmgr
}
// GetLastSavepoint returns the block num recorded in savepoint,
// returns 0 if NO savepoint is found
func (txmgr *LockBasedTxMgr) GetLastSavepoint() (*version.Height, error) {
return txmgr.db.GetLatestSavePoint()
}
// NewQueryExecutor implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewQueryExecutor(txid string) (ledger.QueryExecutor, error) {
qe := newQueryExecutor(txmgr, txid)
txmgr.commitRWLock.RLock()
return qe, nil
}
// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
logger.Debugf("constructing new tx simulator")
s, err := newLockBasedTxSimulator(txmgr, txid)
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
return s, nil
}
// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) error {
block := blockAndPvtdata.Block
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
batch, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
if err != nil {
txmgr.clearCache()
return err
}
txmgr.currentBlock = block
txmgr.batch = batch
return txmgr.invokeNamespaceListeners(batch)
}
func (txmgr *LockBasedTxMgr) invokeNamespaceListeners(batch *privacyenabledstate.UpdateBatch) error {
namespaces := batch.PubUpdates.GetUpdatedNamespaces()
for _, namespace := range namespaces {
listener := txmgr.stateListeners[namespace]
if listener == nil {
continue
}
logger.Debugf("Invoking listener for state changes over namespace:%s", namespace)
updatesMap := batch.PubUpdates.GetUpdates(namespace)
var kvwrites []*kvrwset.KVWrite
for key, versionedValue := range updatesMap {
kvwrites = append(kvwrites, &kvrwset.KVWrite{Key: key, IsDelete: versionedValue.Value == nil, Value: versionedValue.Value})
}
if err := listener.HandleStateUpdates(txmgr.ledgerid, kvwrites); err != nil {
return err
}
}
return nil
}
// Shutdown implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Shutdown() {
txmgr.db.Close()
}
// Commit implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Commit() error {
// If statedb implementation needed bulk read optimization, cache might have been populated by
// ValidateAndPrepare(). Once the block is validated and committed, populated cache needs to
// be cleared.
defer txmgr.clearCache()
logger.Debugf("Committing updates to state database")
txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
logger.Debugf("Write lock acquired for committing updates to state database")
if txmgr.batch == nil {
panic("validateAndPrepare() method should have been called before calling commit()")
}
defer func() { txmgr.batch = nil }()
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.batch,
version.NewHeight(txmgr.currentBlock.Header.Number, uint64(len(txmgr.currentBlock.Data.Data)-1))); err != nil {
return err
}
logger.Debugf("Updates committed to state database")
return nil
}
// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Rollback() {
txmgr.batch = nil
// If statedb implementation needed bulk read optimization, cache might have been populated by
// ValidateAndPrepareBatch(). As the block commit is rollbacked, populated cache needs to
// be cleared now.
txmgr.clearCache()
}
// clearCache empty the cache maintained by the statedb implementation
func (txmgr *LockBasedTxMgr) clearCache() {
if txmgr.db.IsBulkOptimizable() {
txmgr.db.ClearCachedVersions()
}
}
// ShouldRecover implements method in interface kvledger.Recoverer
func (txmgr *LockBasedTxMgr) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error) {
savepoint, err := txmgr.GetLastSavepoint()
if err != nil {
return false, 0, err
}
if savepoint == nil {
return true, 0, nil
}
return savepoint.BlockNum != lastAvailableBlock, savepoint.BlockNum + 1, nil
}
// CommitLostBlock implements method in interface kvledger.Recoverer
func (txmgr *LockBasedTxMgr) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) error {
block := blockAndPvtdata.Block
logger.Debugf("Constructing updateSet for the block %d", block.Header.Number)
if err := txmgr.ValidateAndPrepare(blockAndPvtdata, false); err != nil {
return err
}
logger.Debugf("Committing block %d to state database", block.Header.Number)
return txmgr.Commit()
}