This repository has been archived by the owner on Apr 9, 2024. It is now read-only.
/
db.go
198 lines (166 loc) · 5.76 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package history
import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/dataformat"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
protoutil "github.com/hyperledger/fabric/protoutil"
)
var logger = flogging.MustGetLogger("history")
// DBProvider provides handle to HistoryDB for a given channel
type DBProvider struct {
leveldbProvider *leveldbhelper.Provider
}
// NewDBProvider instantiates DBProvider
func NewDBProvider(path string) (*DBProvider, error) {
logger.Debugf("constructing HistoryDBProvider dbPath=%s", path)
levelDBProvider, err := leveldbhelper.NewProvider(
&leveldbhelper.Conf{
DBPath: path,
ExpectedFormatVersion: dataformat.Version20,
},
)
if err != nil {
return nil, err
}
return &DBProvider{
leveldbProvider: levelDBProvider,
}, nil
}
// GetDBHandle gets the handle to a named database
func (p *DBProvider) GetDBHandle(name string) (*DB, error) {
return &DB{
levelDB: p.leveldbProvider.GetDBHandle(name),
name: name,
},
nil
}
// Close closes the underlying db
func (p *DBProvider) Close() {
p.leveldbProvider.Close()
}
// DB maintains and provides access to history data for a particular channel
type DB struct {
levelDB *leveldbhelper.DBHandle
name string
}
// Commit implements method in HistoryDB interface
func (d *DB) Commit(block *common.Block) error {
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64
dbBatch := leveldbhelper.NewUpdateBatch()
logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
d.name, blockNo, len(block.Data.Data))
// Get the invalidation byte array for the block
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// write each tran's write set to history db
for _, envBytes := range block.Data.Data {
// If the tran is marked as invalid, skip it
if txsFilter.IsInvalid(int(tranNo)) {
logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
d.name, tranNo)
tranNo++
continue
}
env, err := protoutil.GetEnvelopeFromBlock(envBytes)
if err != nil {
return err
}
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
return err
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// extract RWSet from transaction
respPayload, err := protoutil.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}
txRWSet := &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return err
}
// add a history record for each write
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
dataKey := constructDataKey(ns, kvWrite.Key, blockNo, tranNo)
// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
dbBatch.Put(dataKey, emptyValue)
}
}
} else {
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
tranNo++
}
// add savepoint for recovery purpose
height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
// write the block's history records and savepoint to LevelDB
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := d.levelDB.WriteBatch(dbBatch, true); err != nil {
return err
}
logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", d.name, blockNo)
return nil
}
// NewQueryExecutor implements method in HistoryDB interface
func (d *DB) NewQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error) {
return &QueryExecutor{d.levelDB, blockStore}, nil
}
// GetLastSavepoint implements returns the height till which the history is present in the db
func (d *DB) GetLastSavepoint() (*version.Height, error) {
versionBytes, err := d.levelDB.Get(savePointKey)
if err != nil || versionBytes == nil {
return nil, err
}
height, _, err := version.NewHeightFromBytes(versionBytes)
if err != nil {
return nil, err
}
return height, nil
}
// ShouldRecover implements method in interface kvledger.Recoverer
func (d *DB) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error) {
savepoint, err := d.GetLastSavepoint()
if err != nil {
return false, 0, err
}
if savepoint == nil {
return true, 0, nil
}
return savepoint.BlockNum != lastAvailableBlock, savepoint.BlockNum + 1, nil
}
// Name returns the name of the database that manages historical states.
func (d *DB) Name() string {
return "history"
}
// CommitLostBlock implements method in interface kvledger.Recoverer
func (d *DB) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) error {
block := blockAndPvtdata.Block
// log every 1000th block at Info level so that history rebuild progress can be tracked in production envs.
if block.Header.Number%1000 == 0 {
logger.Infof("Recommitting block [%d] to history database", block.Header.Number)
} else {
logger.Debugf("Recommitting block [%d] to history database", block.Header.Number)
}
if err := d.Commit(block); err != nil {
return err
}
return nil
}