/
chservice.go
150 lines (125 loc) · 4.63 KB
/
chservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package dynamicdiscovery
import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/random"
coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context"
fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
"github.com/pkg/errors"
)
// ChannelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
type ChannelService struct {
*service
channelID string
membership fab.ChannelMembership
}
// NewChannelService creates a Discovery Service to query the list of member peers on a given channel.
func NewChannelService(ctx contextAPI.Client, membership fab.ChannelMembership, channelID string, opts ...coptions.Opt) (*ChannelService, error) {
logger.Debug("Creating new dynamic discovery service")
s := &ChannelService{
channelID: channelID,
membership: membership,
}
s.service = newService(ctx.EndpointConfig(), s.queryPeers, opts...)
err := s.service.initialize(ctx)
if err != nil {
return nil, err
}
return s, nil
}
// Close releases resources
func (s *ChannelService) Close() {
logger.Debugf("Closing discovery service for channel [%s]", s.channelID)
s.service.Close()
}
func (s *ChannelService) queryPeers() ([]fab.Peer, error) {
peers, err := s.doQueryPeers()
if err != nil && s.ErrHandler != nil {
logger.Infof("[%s] Got error from discovery query: %s. Invoking error handler", s.channelID, err)
s.ErrHandler(s.ctx, s.channelID, err)
}
return peers, err
}
func (s *ChannelService) doQueryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelID)
ctx := s.context()
targets, err := s.getTargets(ctx)
if err != nil {
return nil, err
}
if len(targets) == 0 {
return nil, errors.Errorf("no peers configured for channel [%s]", s.channelID)
}
reqCtx, cancel := reqContext.NewRequest(ctx, reqContext.WithTimeout(s.responseTimeout))
defer cancel()
req := discclient.NewRequest().OfChannel(s.channelID).AddPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, targets...)
if err != nil {
if len(responses) == 0 {
return nil, errors.Wrapf(err, "error calling discover service send")
}
logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err)
}
return s.evaluate(ctx, responses)
}
func (s *ChannelService) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, error) {
chPeers := ctx.EndpointConfig().ChannelPeers(s.channelID)
if len(chPeers) == 0 {
return nil, errors.Errorf("no channel peers configured for channel [%s]", s.channelID)
}
chConfig := ctx.EndpointConfig().ChannelConfig(s.channelID)
//pick number of peers given in channel policy
return random.PickRandomNPeerConfigs(chPeers, chConfig.Policies.Discovery.MaxTargets), nil
}
// evaluate validates the responses and returns the peers
func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscovery.Response) ([]fab.Peer, error) {
if len(responses) == 0 {
return nil, errors.New("no successful response received from any peer")
}
// TODO: In a future patch:
// - validate the signatures in the responses
// For now just pick the first successful response
var lastErr error
for _, response := range responses {
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = DiscoveryError(err)
logger.Warnf("error getting peers from discovery response: %s", lastErr)
continue
}
return s.asPeers(ctx, endpoints), nil
}
return nil, lastErr
}
func (s *ChannelService) asPeers(ctx contextAPI.Client, endpoints []*discclient.Peer) []fab.Peer {
var peers []fab.Peer
for _, endpoint := range endpoints {
peer, ok := asPeer(ctx, endpoint)
if !ok {
continue
}
//check if cache is updated with tlscert if this is a new org joined and membership is not done yet updating cache
if s.membership.ContainsMSP(peer.MSPID()) {
peers = append(peers, &peerEndpoint{
Peer: peer,
blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight,
})
}
}
return peers
}
type peerEndpoint struct {
fab.Peer
blockHeight uint64
}
func (p *peerEndpoint) BlockHeight() uint64 {
return p.blockHeight
}