forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.go
204 lines (190 loc) · 7.8 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package valimpl
import (
"bytes"
"fmt"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/customtx"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valinternal"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
)
// validateAndPreparePvtBatch pulls out the private write-set for the transactions that are marked as valid
// by the internal public data validator. Finally, it validates (if not already self-endorsed) the pvt rwset against the
// corresponding hash present in the public rwset
func validateAndPreparePvtBatch(block *valinternal.Block, pvtdata map[uint64]*ledger.TxPvtData) (*privacyenabledstate.PvtUpdateBatch, error) {
pvtUpdates := privacyenabledstate.NewPvtUpdateBatch()
for _, tx := range block.Txs {
if tx.ValidationCode != peer.TxValidationCode_VALID {
continue
}
if !tx.ContainsPvtWrites() {
continue
}
txPvtdata := pvtdata[uint64(tx.IndexInBlock)]
if txPvtdata == nil {
continue
}
if requiresPvtdataValidation(txPvtdata) {
if err := validatePvtdata(tx, txPvtdata); err != nil {
return nil, err
}
}
var pvtRWSet *rwsetutil.TxPvtRwSet
var err error
if pvtRWSet, err = rwsetutil.TxPvtRwSetFromProtoMsg(txPvtdata.WriteSet); err != nil {
return nil, err
}
addPvtRWSetToPvtUpdateBatch(pvtRWSet, pvtUpdates, version.NewHeight(block.Num, uint64(tx.IndexInBlock)))
}
return pvtUpdates, nil
}
// requiresPvtdataValidation returns whether or not a hashes of the collection should be computed
// for the collections of present in the private data
// TODO for now always return true. Add capabilty of checking if this data was produced by
// the validating peer itself during similation and in that case return false
func requiresPvtdataValidation(tx *ledger.TxPvtData) bool {
return true
}
// validPvtdata returns true if hashes of all the collections writeset present in the pvt data
// match with the corresponding hashes present in the public read-write set
func validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error {
if pvtdata.WriteSet == nil {
return nil
}
for _, nsPvtdata := range pvtdata.WriteSet.NsPvtRwset {
for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
collPvtdataHash := util.ComputeHash(collPvtdata.Rwset)
hashInPubdata := tx.RetrieveHash(nsPvtdata.Namespace, collPvtdata.CollectionName)
if !bytes.Equal(collPvtdataHash, hashInPubdata) {
return &validator.ErrPvtdataHashMissmatch{
Msg: fmt.Sprintf(`Hash of pvt data for collection [%s:%s] does not match with the corresponding hash in the public data.
public hash = [%#v], pvt data hash = [%#v]`, nsPvtdata.Namespace, collPvtdata.CollectionName, hashInPubdata, collPvtdataHash),
}
}
}
}
return nil
}
// preprocessProtoBlock parses the proto instance of block into 'Block' structure.
// The retuned 'Block' structure contains only transactions that are endorser transactions and are not alredy marked as invalid
func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error) {
b := &valinternal.Block{Num: block.Header.Number}
// Committer validator has already set validation flags based on well formed tran checks
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Precaution in case committer validator has not added validation flags yet
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
var env *common.Envelope
var chdr *common.ChannelHeader
var payload *common.Payload
var err error
if env, err = utils.GetEnvelopeFromBlock(envBytes); err == nil {
if payload, err = utils.GetPayload(env); err == nil {
chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
}
}
if txsFilter.IsInvalid(txIndex) {
// Skipping invalid transaction
logger.Warningf("Channel [%s]: Block [%d] Transaction index [%d] TxId [%s]"+
" marked as invalid by committer. Reason code [%s]",
chdr.GetChannelId(), block.Header.Number, txIndex, chdr.GetTxId(),
txsFilter.Flag(txIndex).String())
continue
}
if err != nil {
return nil, err
}
var txRWSet *rwsetutil.TxRwSet
txType := common.HeaderType(chdr.Type)
logger.Debugf("txType=%s", txType)
if txType == common.HeaderType_ENDORSER_TRANSACTION {
// extract actions from the envelope message
respPayload, err := utils.GetActionFromEnvelope(envBytes)
if err != nil {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_NIL_TXACTION)
continue
}
txRWSet = &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
} else {
rwsetProto, err := processNonEndorserTx(env, chdr.TxId, txType, txmgr, !doMVCCValidation)
if _, ok := err.(*customtx.InvalidTxError); ok {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
if err != nil {
return nil, err
}
if rwsetProto != nil {
if txRWSet, err = rwsetutil.TxRwSetFromProtoMsg(rwsetProto); err != nil {
return nil, err
}
}
}
if txRWSet != nil {
b.Txs = append(b.Txs, &valinternal.Transaction{IndexInBlock: txIndex, ID: chdr.TxId, RWSet: txRWSet})
}
}
return b, nil
}
func processNonEndorserTx(txEnv *common.Envelope, txid string, txType common.HeaderType, txmgr txmgr.TxMgr, synchingState bool) (*rwset.TxReadWriteSet, error) {
logger.Debugf("Performing custom processing for transaction [txid=%s], [txType=%s]", txid, txType)
processor := customtx.GetProcessor(txType)
logger.Debugf("Processor for custom tx processing:%#v", processor)
if processor == nil {
return nil, nil
}
var err error
var sim ledger.TxSimulator
var simRes *ledger.TxSimulationResults
if sim, err = txmgr.NewTxSimulator(txid); err != nil {
return nil, err
}
defer sim.Done()
if err = processor.GenerateSimulationResults(txEnv, sim, synchingState); err != nil {
return nil, err
}
if simRes, err = sim.GetTxSimulationResults(); err != nil {
return nil, err
}
return simRes.PubSimulationResults, nil
}
// postprocessProtoBlock updates the proto block's validation flags (in metadata) by the results of validation process
func postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block) {
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for _, tx := range validatedBlock.Txs {
txsFilter.SetFlag(tx.IndexInBlock, tx.ValidationCode)
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
func addPvtRWSetToPvtUpdateBatch(pvtRWSet *rwsetutil.TxPvtRwSet, pvtUpdateBatch *privacyenabledstate.PvtUpdateBatch, ver *version.Height) {
for _, ns := range pvtRWSet.NsPvtRwSet {
for _, coll := range ns.CollPvtRwSets {
for _, kvwrite := range coll.KvRwSet.Writes {
if !kvwrite.IsDelete {
pvtUpdateBatch.Put(ns.NameSpace, coll.CollectionName, kvwrite.Key, kvwrite.Value, ver)
} else {
pvtUpdateBatch.Delete(ns.NameSpace, coll.CollectionName, kvwrite.Key, ver)
}
}
}
}
}