forked from hyperledger/fabric
/
endorse.go
271 lines (241 loc) · 10.6 KB
/
endorse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*
Copyright 2021 IBM All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package gateway
import (
"context"
"fmt"
gp "github.com/ZihuaZhang/fabric-protos-go/gateway"
"github.com/ZihuaZhang/fabric-protos-go/peer"
"github.com/ZihuaZhang/fabric/common/flogging"
"github.com/ZihuaZhang/fabric/protoutil"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Endorse will collect endorsements by invoking the transaction function specified in the SignedProposal against
// sufficient Peers to satisfy the endorsement policy.
func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.EndorseResponse, error) {
if request == nil {
return nil, status.Error(codes.InvalidArgument, "an endorse request is required")
}
signedProposal := request.GetProposedTransaction()
if len(signedProposal.GetProposalBytes()) == 0 {
return nil, status.Error(codes.InvalidArgument, "the proposed transaction must contain a signed proposal")
}
proposal, err := protoutil.UnmarshalProposal(signedProposal.GetProposalBytes())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
header, err := protoutil.UnmarshalHeader(proposal.GetHeader())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
channelHeader, err := protoutil.UnmarshalChannelHeader(header.GetChannelHeader())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
payload, err := protoutil.UnmarshalChaincodeProposalPayload(proposal.GetPayload())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
spec, err := protoutil.UnmarshalChaincodeInvocationSpec(payload.GetInput())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
channel := channelHeader.GetChannelId()
chaincodeID := spec.GetChaincodeSpec().GetChaincodeId().GetName()
hasTransientData := len(payload.GetTransientMap()) > 0
logger := gs.logger.With("channel", channel, "chaincode", chaincodeID, "txID", request.GetTransactionId())
var plan *plan
var action *peer.ChaincodeEndorsedAction
if len(request.GetEndorsingOrganizations()) > 0 {
// The client is specifying the endorsing orgs and taking responsibility for ensuring it meets the signature policy
plan, err = gs.registry.planForOrgs(channel, chaincodeID, request.GetEndorsingOrganizations())
if err != nil {
return nil, status.Error(codes.Unavailable, err.Error())
}
} else {
// The client is delegating choice of endorsers to the gateway.
plan, err = gs.planFromFirstEndorser(ctx, channel, chaincodeID, hasTransientData, signedProposal, logger)
if err != nil {
return nil, err
}
}
for plan.completedLayout == nil {
// loop through the layouts until one gets satisfied
endorsers := plan.endorsers()
if endorsers == nil {
// no more layouts
break
}
// send to all the endorsers
waitCh := make(chan bool, len(endorsers))
for _, e := range endorsers {
go func(e *endorser) {
for e != nil {
if gs.processProposal(ctx, plan, e, signedProposal, logger) {
break
}
e = plan.nextPeerInGroup(e)
}
waitCh <- true
}(e)
}
for i := 0; i < len(endorsers); i++ {
select {
case <-waitCh:
// Endorser completedLayout normally
case <-ctx.Done():
logger.Warnw("Endorse call timed out while collecting endorsements", "numEndorsers", len(endorsers))
return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting endorsements")
}
}
}
if plan.completedLayout == nil {
return nil, newRpcError(codes.Aborted, "failed to collect enough transaction endorsements, see attached details for more info", plan.errorDetails...)
}
action = &peer.ChaincodeEndorsedAction{ProposalResponsePayload: plan.responsePayload, Endorsements: uniqueEndorsements(plan.completedLayout.endorsements)}
preparedTransaction, err := prepareTransaction(header, payload, action)
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to assemble transaction: %s", err)
}
return &gp.EndorseResponse{PreparedTransaction: preparedTransaction}, nil
}
type ppResponse struct {
response *peer.ProposalResponse
err error
}
// processProposal will invoke the given endorsing peer to process the signed proposal, and will update the plan accordingly.
// This function will timeout and return false if the given context timeout or the EndorsementTimeout option expires.
// Returns boolean true if the endorsement was successful.
func (gs *Server) processProposal(ctx context.Context, plan *plan, endorser *endorser, signedProposal *peer.SignedProposal, logger *flogging.FabricLogger) bool {
var response *peer.ProposalResponse
done := make(chan *ppResponse)
go func() {
defer close(done)
logger.Debugw("Sending to endorser:", "MSPID", endorser.mspid, "endpoint", endorser.address)
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) // timeout of individual endorsement
defer cancel()
response, err := endorser.client.ProcessProposal(ctx, signedProposal)
done <- &ppResponse{response: response, err: err}
}()
select {
case resp := <-done:
// Endorser completedLayout normally
code, message, _, remove := responseStatus(resp.response, resp.err)
if code != codes.OK {
logger.Warnw("Endorse call to endorser failed", "MSPID", endorser.mspid, "endpoint", endorser.address, "error", message)
if remove {
gs.registry.removeEndorser(endorser)
}
plan.addError(errorDetail(endorser.endpointConfig, message))
return false
}
response = resp.response
logger.Debugw("Endorse call to endorser returned success", "MSPID", endorser.mspid, "endpoint", endorser.address, "status", response.Response.Status, "message", response.Response.Message)
responseMessage := response.GetResponse()
if responseMessage != nil {
responseMessage.Payload = nil // Remove any duplicate response payload
}
return plan.processEndorsement(endorser, response)
case <-ctx.Done():
// Overall endorsement timeout expired
return false
}
}
// planFromFirstEndorser implements the gateway's strategy of processing the proposal on a single (preferably local) peer
// and using the ChaincodeInterest from the response to invoke discovery and build an endorsement plan.
// Returns the endorsement plan which can be used to request further endorsements, if required.
func (gs *Server) planFromFirstEndorser(ctx context.Context, channel string, chaincodeID string, hasTransientData bool, signedProposal *peer.SignedProposal, logger *flogging.FabricLogger) (*plan, error) {
defaultInterest := &peer.ChaincodeInterest{
Chaincodes: []*peer.ChaincodeCall{{
Name: chaincodeID,
}},
}
// 1. Choose an endorser from the gateway's organization
plan, err := gs.registry.planForOrgs(channel, chaincodeID, []string{gs.registry.localEndorser.mspid})
if err != nil {
// No local org endorsers for this channel/chaincode. If transient data is involved, return error
if hasTransientData {
return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data")
}
// Otherwise, just let discovery pick one.
plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
}
firstEndorser := plan.endorsers()[0]
gs.logger.Debugw("Sending to first endorser:", "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address)
// 2. Process the proposal on this endorser
var firstResponse *peer.ProposalResponse
var errDetails []proto.Message
for firstResponse == nil && firstEndorser != nil {
done := make(chan struct{})
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
code, message, _, remove := responseStatus(firstResponse, err)
if code != codes.OK {
logger.Warnw("Endorse call to endorser failed", "endorserAddress", firstEndorser.address, "endorserMspid", firstEndorser.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(firstEndorser)
}
firstEndorser = plan.nextPeerInGroup(firstEndorser)
firstResponse = nil
}
}()
select {
case <-done:
// Endorser completed normally
case <-ctx.Done():
// Overall endorsement timeout expired
logger.Warn("Endorse call timed out while collecting first endorsement")
return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting first endorsement")
}
}
if firstEndorser == nil || firstResponse == nil {
return nil, newRpcError(codes.Aborted, "failed to endorse transaction, see attached details for more info", errDetails...)
}
// 3. Extract ChaincodeInterest and SBE policies
// The chaincode interest could be nil for legacy peers and for chaincode functions that don't produce a read-write set
interest := firstResponse.Interest
if len(interest.GetChaincodes()) == 0 {
interest = defaultInterest
}
// 4. If transient data is involved, then we need to ensure that discovery only returns orgs which own the collections involved.
// Do this by setting NoPrivateReads to false on each collection
originalInterest := &peer.ChaincodeInterest{}
var protectedCollections []string
if hasTransientData {
for _, call := range interest.GetChaincodes() {
ccc := *call // shallow copy
originalInterest.Chaincodes = append(originalInterest.Chaincodes, &ccc)
if call.NoPrivateReads {
call.NoPrivateReads = false
protectedCollections = append(protectedCollections, call.CollectionNames...)
}
}
}
// 5. Get a set of endorsers from discovery via the registry
// The preferred discovery layout will contain the firstEndorser's Org.
plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser)
if err != nil {
if len(protectedCollections) > 0 {
// may have failed because of the cautious approach we are taking with transient data - check
_, err = gs.registry.endorsementPlan(channel, originalInterest, firstEndorser)
if err == nil {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("requires endorsement from organisation(s) that are not in the distribution policy of the private data collection(s): %v; retry specifying trusted endorsing organizations to protect transient data", protectedCollections))
}
}
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
// 6. Remove the gateway org's endorser, since we've already done that
plan.processEndorsement(firstEndorser, firstResponse)
return plan, nil
}