/
hlfexecutor.go
110 lines (95 loc) · 3.24 KB
/
hlfexecutor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package hlf
import (
"context"
"time"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"
"github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/filter"
"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/status"
chctx "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/pkg/errors"
)
type ExecuteOptions struct {
ExecuteTimeout time.Duration
}
type hlfExecutor struct {
chClient *channel.Client
chCtx chctx.Channel
execOpts ExecuteOptions
}
func (he *hlfExecutor) execute(ctx context.Context, request channel.Request) (channel.Response, error) {
var options []channel.RequestOption
executeTimeout := he.chCtx.EndpointConfig().Timeout(fab.Execute)
if he.execOpts.ExecuteTimeout > 0 {
executeTimeout = he.execOpts.ExecuteTimeout
}
options = append(options, channel.WithTimeout(fab.Execute, executeTimeout))
// use all channel peers from connection profile in the peers selection algorithm
options = append(options,
channel.WithTargetFilter(
filter.NewEndpointFilter(
he.chCtx, filter.EndorsingPeer)))
h := invoke.NewSelectAndEndorseHandler(
invoke.NewEndorsementValidationHandler(
invoke.NewSignatureValidationHandler(
&commitTxHandler{
ctx: ctx,
execOpts: he.execOpts,
}),
),
)
return he.chClient.InvokeHandler(h, request, options...)
}
// CommitTxHandler for committing transactions
type commitTxHandler struct {
ctx context.Context
execOpts ExecuteOptions
}
// Handle handles commit tx
func (cth *commitTxHandler) Handle(reqCtx *invoke.RequestContext, clientCtx *invoke.ClientContext) {
// register tx event
reg, statusNotifier, err := clientCtx.
EventService.RegisterTxStatusEvent(
string(reqCtx.Response.TransactionID))
if err != nil {
reqCtx.Error = errors.Wrap(errors.WithStack(err), "error registering for TxStatus event")
return
}
defer clientCtx.EventService.Unregister(reg)
tx, err := clientCtx.Transactor.CreateTransaction(
fab.TransactionRequest{
Proposal: reqCtx.Response.Proposal,
ProposalResponses: reqCtx.Response.Responses,
})
if err != nil {
reqCtx.Error = errors.Wrap(errors.WithStack(err), "createTransaction failed")
return
}
if _, err = clientCtx.Transactor.SendTransaction(tx); err != nil {
reqCtx.Error = errors.Wrap(errors.WithStack(err), "sendTransaction failed")
return
}
select {
case txStatus := <-statusNotifier:
reqCtx.Response.BlockNumber = txStatus.BlockNumber
reqCtx.Response.TxValidationCode = txStatus.TxValidationCode
if txStatus.TxValidationCode != pb.TxValidationCode_VALID {
reqCtx.Error = errors.WithStack(
status.New(status.EventServerStatus, int32(txStatus.TxValidationCode),
"received invalid transaction", nil))
}
return
case <-cth.ctx.Done():
reqCtx.Error = errors.WithStack(
status.New(status.ClientStatus, status.Unknown.ToInt32(),
"Execute didn't receive block event (context done)", nil))
return
case <-reqCtx.Ctx.Done():
reqCtx.Error = errors.WithStack(
status.New(status.ClientStatus, status.Timeout.ToInt32(),
"Execute didn't receive block event", nil))
return
}
}