forked from hyperledger/fabric-sdk-go
/
chclient.go
305 lines (258 loc) · 10.4 KB
/
chclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
// Package channel enables access to a channel on a Fabric network. A channel client instance provides a handler to interact with peers on specified channel.
// Channel client can query chaincode, execute chaincode and register/unregister for chaincode events on specific channel.
// An application that requires interaction with multiple channels should create a separate instance of the channel client for each channel.
//
// Basic Flow:
// 1) Prepare channel client context
// 2) Create channel client
// 3) Execute chaincode
// 4) Query chaincode
package channel
import (
reqContext "context"
"time"
"github.com/birdycloud/fabric-sdk-go/pkg/client/channel/invoke"
"github.com/birdycloud/fabric-sdk-go/pkg/client/common/discovery/greylist"
"github.com/birdycloud/fabric-sdk-go/pkg/client/common/filter"
selectopts "github.com/birdycloud/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/birdycloud/fabric-sdk-go/pkg/common/errors/retry"
"github.com/birdycloud/fabric-sdk-go/pkg/common/errors/status"
"github.com/birdycloud/fabric-sdk-go/pkg/common/providers/context"
"github.com/birdycloud/fabric-sdk-go/pkg/common/providers/fab"
contextImpl "github.com/birdycloud/fabric-sdk-go/pkg/context"
"github.com/birdycloud/fabric-sdk-go/pkg/fabsdk/metrics"
"github.com/pkg/errors"
)
// Client enables access to a channel on a Fabric network.
//
// A channel client instance provides a handler to interact with peers on specified channel.
// An application that requires interaction with multiple channels should create a separate
// instance of the channel client for each channel. Channel client supports non-admin functions only.
type Client struct {
context context.Channel
membership fab.ChannelMembership
eventService fab.EventService
greylist *greylist.Filter
metrics *metrics.ClientMetrics
}
// ClientOption describes a functional parameter for the New constructor
type ClientOption func(*Client) error
// New returns a Client instance. Channel client can query chaincode, execute chaincode and register/unregister for chaincode events on specific channel.
func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) {
channelContext, err := channelProvider()
if err != nil {
return nil, errors.WithMessage(err, "failed to create channel context")
}
greylistProvider := greylist.New(channelContext.EndpointConfig().Timeout(fab.DiscoveryGreylistExpiry))
if channelContext.ChannelService() == nil {
return nil, errors.New("channel service not initialized")
}
eventService, err := channelContext.ChannelService().EventService()
if err != nil {
return nil, errors.WithMessage(err, "event service creation failed")
}
membership, err := channelContext.ChannelService().Membership()
if err != nil {
return nil, errors.WithMessage(err, "membership creation failed")
}
channelClient := newClient(channelContext, membership, eventService, greylistProvider)
for _, param := range opts {
err := param(&channelClient)
if err != nil {
return nil, errors.WithMessage(err, "option failed")
}
}
return &channelClient, nil
}
// Query chaincode using request and optional request options
// Parameters:
// request holds info about mandatory chaincode ID and function
// options holds optional request options
//
// Returns:
// the proposal responses from peer(s)
func (cc *Client) Query(request Request, options ...RequestOption) (Response, error) {
options = append(options, addDefaultTimeout(fab.Query))
options = append(options, addDefaultTargetFilter(cc.context, filter.ChaincodeQuery))
return callQuery(cc, request, options...)
}
// Execute prepares and executes transaction using request and optional request options
// Parameters:
// request holds info about mandatory chaincode ID and function
// options holds optional request options
//
// Returns:
// the proposal responses from peer(s)
func (cc *Client) Execute(request Request, options ...RequestOption) (Response, error) {
options = append(options, addDefaultTimeout(fab.Execute))
options = append(options, addDefaultTargetFilter(cc.context, filter.EndorsingPeer))
return callExecute(cc, request, options...)
}
// addDefaultTargetFilter adds default target filter if target filter is not specified
func addDefaultTargetFilter(chCtx context.Channel, ft filter.EndpointType) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
if len(o.Targets) == 0 && o.TargetFilter == nil {
return WithTargetFilter(filter.NewEndpointFilter(chCtx, ft))(ctx, o)
}
return nil
}
}
// addDefaultTimeout adds default timeout if timeout is not specified
func addDefaultTimeout(tt fab.TimeoutType) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
if o.Timeouts[tt] == 0 {
return WithTimeout(tt, ctx.EndpointConfig().Timeout(tt))(ctx, o)
}
return nil
}
}
// InvokeHandler invokes handler using request and optional request options provided
// Parameters:
// handler to be invoked
// request holds info about mandatory chaincode ID and function
// options holds optional request options
//
// Returns:
// the proposal responses from peer(s)
func (cc *Client) InvokeHandler(handler invoke.Handler, request Request, options ...RequestOption) (Response, error) {
//Read execute tx options
txnOpts, err := cc.prepareOptsFromOptions(cc.context, options...)
if err != nil {
return Response{}, err
}
reqCtx, cancel := cc.createReqContext(&txnOpts)
defer cancel()
//Prepare context objects for handler
requestContext, clientContext, err := cc.prepareHandlerContexts(reqCtx, request, txnOpts)
if err != nil {
return Response{}, err
}
invoker := retry.NewInvoker(
requestContext.RetryHandler,
retry.WithBeforeRetry(
func(err error) {
if requestContext.Opts.BeforeRetry != nil {
requestContext.Opts.BeforeRetry(err)
}
cc.greylist.Greylist(err)
// Reset context parameters
requestContext.Opts.Targets = txnOpts.Targets
requestContext.Error = nil
requestContext.Response = invoke.Response{}
},
),
)
complete := make(chan bool, 1)
go func() {
_, _ = invoker.Invoke( // nolint: gas
func() (interface{}, error) {
handler.Handle(requestContext, clientContext)
return nil, requestContext.Error
})
complete <- true
}()
select {
case <-complete:
return Response(requestContext.Response), requestContext.Error
case <-reqCtx.Done():
return Response{}, status.New(status.ClientStatus, status.Timeout.ToInt32(),
"request timed out or been cancelled", nil)
}
}
//createReqContext creates req context for invoke handler
func (cc *Client) createReqContext(txnOpts *requestOptions) (reqContext.Context, reqContext.CancelFunc) {
if txnOpts.Timeouts == nil {
txnOpts.Timeouts = make(map[fab.TimeoutType]time.Duration)
}
//setting default timeouts when not provided
if txnOpts.Timeouts[fab.Execute] == 0 {
txnOpts.Timeouts[fab.Execute] = cc.context.EndpointConfig().Timeout(fab.Execute)
}
reqCtx, cancel := contextImpl.NewRequest(cc.context, contextImpl.WithTimeout(txnOpts.Timeouts[fab.Execute]),
contextImpl.WithParent(txnOpts.ParentContext))
//Add timeout overrides here as a value so that it can be used by immediate child contexts (in handlers/transactors)
reqCtx = reqContext.WithValue(reqCtx, contextImpl.ReqContextTimeoutOverrides, txnOpts.Timeouts)
return reqCtx, cancel
}
//prepareHandlerContexts prepares context objects for handlers
func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Request, o requestOptions) (*invoke.RequestContext, *invoke.ClientContext, error) {
if request.ChaincodeID == "" || request.Fcn == "" {
return nil, nil, errors.New("ChaincodeID and Fcn are required")
}
transactor, err := cc.context.ChannelService().Transactor(reqCtx)
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create transactor")
}
selection, err := cc.context.ChannelService().Selection()
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create selection service")
}
discovery, err := cc.context.ChannelService().Discovery()
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create discovery service")
}
peerFilter := func(peer fab.Peer) bool {
if !cc.greylist.Accept(peer) {
return false
}
if o.TargetFilter != nil && !o.TargetFilter.Accept(peer) {
return false
}
return true
}
var peerSorter selectopts.PeerSorter
if o.TargetSorter != nil {
peerSorter = func(peers []fab.Peer) []fab.Peer {
return o.TargetSorter.Sort(peers)
}
}
clientContext := &invoke.ClientContext{
Selection: selection,
Discovery: discovery,
Membership: cc.membership,
Transactor: transactor,
EventService: cc.eventService,
}
requestContext := &invoke.RequestContext{
Request: invoke.Request(request),
Opts: invoke.Opts(o),
Response: invoke.Response{},
RetryHandler: retry.New(o.Retry),
Ctx: reqCtx,
SelectionFilter: peerFilter,
PeerSorter: peerSorter,
}
return requestContext, clientContext, nil
}
//prepareOptsFromOptions Reads apitxn.Opts from Option array
func (cc *Client) prepareOptsFromOptions(ctx context.Client, options ...RequestOption) (requestOptions, error) {
txnOpts := requestOptions{}
for _, option := range options {
err := option(ctx, &txnOpts)
if err != nil {
return txnOpts, errors.WithMessage(err, "Failed to read opts")
}
}
return txnOpts, nil
}
// RegisterChaincodeEvent registers for chaincode events. Unregister must be called when the registration is no longer needed.
// Parameters:
// chaincodeID is the chaincode ID for which events are to be received
// eventFilter is the chaincode event filter (regular expression) for which events are to be received
//
// Returns:
// the registration and a channel that is used to receive events. The channel is closed when Unregister is called.
func (cc *Client) RegisterChaincodeEvent(chainCodeID string, eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error) {
// Register callback for CE
return cc.eventService.RegisterChaincodeEvent(chainCodeID, eventFilter)
}
// UnregisterChaincodeEvent removes the given registration and closes the event channel.
// Parameters:
// registration is the registration handle that was returned from RegisterChaincodeEvent method
func (cc *Client) UnregisterChaincodeEvent(registration fab.Registration) {
cc.eventService.Unregister(registration)
}