Skip to content

Commit

Permalink
[FAB-8899] Dynamic selection service caching peers
Browse files Browse the repository at this point in the history
In the previous implementation, target peers
were passed into GetEndoresersForChaincode and the
selection service, on first init, cached these
peers and returned them every time thereafter. With
this change the dynamic selection service calls the
discovery service on each call.

The selection service now requires a channel context
in order to retrieve a handle to the discovery service.
A new Initialize function on the selection service is
called by the channel context after it is created in
order to provide it the context.

The peer target filter was also removed from the client
options and into the GetEndorsersForChaincode function.
This filter was really meant to be a per-request filter
as opposed to a per-channel client filter. A filter on
the channel client is not necessary since a custom
discovery provider may be injected with the required
custom filtering logic.

Change-Id: Ib6ad9de332c000c26f3d6d98b425caf1a3614252
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 15, 2018
1 parent 5346c29 commit 523348b
Show file tree
Hide file tree
Showing 24 changed files with 590 additions and 270 deletions.
9 changes: 9 additions & 0 deletions pkg/client/channel/api.go
Expand Up @@ -22,6 +22,7 @@ import (
// opts allows the user to specify more advanced options
type requestOptions struct {
Targets []fab.Peer // targets
TargetFilter fab.TargetFilter
Retry retry.Opts
Timeouts map[core.TimeoutType]time.Duration //timeout options for channel client operations
ParentContext reqContext.Context //parent grpc context for channel client operations (query, execute, invokehandler)
Expand Down Expand Up @@ -82,6 +83,14 @@ func WithTargetURLs(urls ...string) RequestOption {
}
}

// WithTargetFilter specifies a per-request target peer-filter
func WithTargetFilter(filter fab.TargetFilter) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
o.TargetFilter = filter
return nil
}
}

// WithRetry option to configure retries
func WithRetry(retryOpt retry.Opts) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
Expand Down
58 changes: 21 additions & 37 deletions pkg/client/channel/chclient.go
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/greylist"
"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context"
Expand All @@ -33,33 +32,15 @@ var logger = logging.NewLogger("fabsdk/fab")
// An application that requires interaction with multiple channels should create a separate
// instance of the channel client for each channel. Channel client supports non-admin functions only.
type Client struct {
context context.Channel
membership fab.ChannelMembership
eventService fab.EventService
greylist *greylist.Filter
discoveryFilter fab.TargetFilter
}

type customChannelContext struct {
context.Channel
discoveryService fab.DiscoveryService
}

func (ccc *customChannelContext) DiscoveryService() fab.DiscoveryService {
return ccc.discoveryService
context context.Channel
membership fab.ChannelMembership
eventService fab.EventService
greylist *greylist.Filter
}

// ClientOption describes a functional parameter for the New constructor
type ClientOption func(*Client) error

// WithTargetFilter option to configure new
func WithTargetFilter(filter fab.TargetFilter) ClientOption {
return func(client *Client) error {
client.discoveryFilter = filter
return nil
}
}

// New returns a Client instance.
func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) {

Expand Down Expand Up @@ -88,21 +69,13 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
membership: membership,
eventService: eventService,
greylist: greylistProvider,
context: channelContext,
}

for _, param := range opts {
param(&channelClient)
}

//target filter
discoveryService := discovery.NewDiscoveryFilterService(channelContext.DiscoveryService(), channelClient.discoveryFilter)

//greylist filter
customDiscoveryService := discovery.NewDiscoveryFilterService(discoveryService, greylistProvider)

//update context
channelClient.context = &customChannelContext{Channel: channelContext, discoveryService: customDiscoveryService}

return &channelClient, nil
}

Expand Down Expand Up @@ -207,6 +180,16 @@ func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Requ
return nil, nil, errors.WithMessage(err, "failed to create transactor")
}

peerFilter := func(peer fab.Peer) bool {
if !cc.greylist.Accept(peer) {
return false
}
if o.TargetFilter != nil && !o.TargetFilter.Accept(peer) {
return false
}
return true
}

clientContext := &invoke.ClientContext{
Selection: cc.context.SelectionService(),
Discovery: cc.context.DiscoveryService(),
Expand All @@ -216,11 +199,12 @@ func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Requ
}

requestContext := &invoke.RequestContext{
Request: invoke.Request(request),
Opts: invoke.Opts(o),
Response: invoke.Response{},
RetryHandler: retry.New(o.Retry),
Ctx: reqCtx,
Request: invoke.Request(request),
Opts: invoke.Opts(o),
Response: invoke.Response{},
RetryHandler: retry.New(o.Retry),
Ctx: reqCtx,
SelectionFilter: peerFilter,
}

return requestContext, clientContext, nil
Expand Down
34 changes: 13 additions & 21 deletions pkg/client/channel/chclient_test.go
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke"
txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/staticselection"
"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
Expand Down Expand Up @@ -115,15 +116,6 @@ func TestQuery(t *testing.T) {

}

func TestQueryDiscoveryError(t *testing.T) {
chClient := setupChannelClientWithError(errors.New("Test Error"), nil, nil, t)

_, err := chClient.Query(Request{ChaincodeID: "testCC", Fcn: "invoke", Args: [][]byte{[]byte("query"), []byte("b")}})
if err == nil {
t.Fatalf("Should have failed to query with error in discovery.GetPeers()")
}
}

func TestQuerySelectionError(t *testing.T) {
chClient := setupChannelClientWithError(nil, errors.New("Test Error"), nil, t)

Expand Down Expand Up @@ -302,16 +294,6 @@ func TestQueryWithCustomEndorser(t *testing.T) {
}
}

func TestExecuteTxDiscoveryError(t *testing.T) {
chClient := setupChannelClientWithError(errors.New("Test Error"), nil, nil, t)

_, err := chClient.Execute(Request{ChaincodeID: "testCC", Fcn: "invoke",
Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}})
if err == nil {
t.Fatalf("Should have failed to execute tx with error in discovery.GetPeers()")
}
}

func TestExecuteTxSelectionError(t *testing.T) {
chClient := setupChannelClientWithError(nil, errors.New("Test Error"), nil, t)

Expand Down Expand Up @@ -444,22 +426,32 @@ func TestMultiErrorPropogation(t *testing.T) {
assert.Equal(t, "Multiple errors occurred: \nTest Error\nTest Error", statusError.Message, "Expected multi error message")
}

type serviceInit interface {
Initialize(context context.Channel) error
}

func TestDiscoveryGreylist(t *testing.T) {

testPeer1 := fcmocks.NewMockPeer("Peer1", "http://peer1.com")
testPeer1.Error = status.New(status.EndorserClientStatus,
status.ConnectionFailed.ToInt32(), "test", []interface{}{testPeer1.URL()})

selectionService, err := setupTestSelection(nil, nil)
selectionProvider, err := staticselection.New(fcmocks.NewMockConfig())
assert.Nil(t, err, "Got error %s", err)

selectionService, err := selectionProvider.CreateSelectionService("mychannel")
assert.Nil(t, err, "Got error %s", err)
selectionService.SelectAll = true

discoveryService, err := setupTestDiscovery(nil, []fab.Peer{testPeer1})
assert.Nil(t, err, "Got error %s", err)

fabCtx := setupCustomTestContext(t, selectionService, discoveryService, nil)
ctx := createChannelContext(fabCtx, channelID)

channelCtx, err := ctx()
assert.Nil(t, err, "Got error %s", err)
selectionService.(serviceInit).Initialize(channelCtx)

chClient, err := New(ctx)
assert.Nil(t, err, "Got error %s", err)

Expand Down
15 changes: 9 additions & 6 deletions pkg/client/channel/invoke/api.go
Expand Up @@ -11,6 +11,7 @@ import (
reqContext "context"
"time"

selectopts "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/errors/retry"
Expand All @@ -20,6 +21,7 @@ import (
// Opts allows the user to specify more advanced options
type Opts struct {
Targets []fab.Peer // targets
TargetFilter fab.TargetFilter
Retry retry.Opts
Timeouts map[core.TimeoutType]time.Duration
ParentContext reqContext.Context //parent grpc context
Expand Down Expand Up @@ -59,10 +61,11 @@ type ClientContext struct {

//RequestContext contains request, opts, response parameters for handler execution
type RequestContext struct {
Request Request
Opts Opts
Response Response
Error error
RetryHandler retry.Handler
Ctx reqContext.Context
Request Request
Opts Opts
Response Response
Error error
RetryHandler retry.Handler
Ctx reqContext.Context
SelectionFilter selectopts.PeerFilter
}
22 changes: 9 additions & 13 deletions pkg/client/channel/invoke/txnhandler.go
Expand Up @@ -9,9 +9,11 @@ package invoke
import (
"bytes"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
"github.com/pkg/errors"

selectopts "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/txn"
Expand Down Expand Up @@ -64,23 +66,17 @@ type ProposalProcessorHandler struct {

//Handle selects proposal processors
func (h *ProposalProcessorHandler) Handle(requestContext *RequestContext, clientContext *ClientContext) {
//Get proposal processor, if not supplied then use discovery service to get available peers as endorser
//If selection service available then get endorser peers for this chaincode
//Get proposal processor, if not supplied then use selection service to get available peers as endorser
if len(requestContext.Opts.Targets) == 0 {
// Use discovery service to figure out proposal processors
peers, err := clientContext.Discovery.GetPeers()
var selectionOpts []options.Opt
if requestContext.SelectionFilter != nil {
selectionOpts = append(selectionOpts, selectopts.WithPeerFilter(requestContext.SelectionFilter))
}
endorsers, err := clientContext.Selection.GetEndorsersForChaincode([]string{requestContext.Request.ChaincodeID}, selectionOpts...)
if err != nil {
requestContext.Error = errors.WithMessage(err, "GetPeers failed")
requestContext.Error = errors.WithMessage(err, "Failed to get endorsing peers")
return
}
endorsers := peers
if clientContext.Selection != nil {
endorsers, err = clientContext.Selection.GetEndorsersForChaincode(peers, requestContext.Request.ChaincodeID)
if err != nil {
requestContext.Error = errors.WithMessage(err, "Failed to get endorsing peers")
return
}
}
requestContext.Opts.Targets = endorsers
}

Expand Down
44 changes: 31 additions & 13 deletions pkg/client/channel/invoke/txnhandler_test.go
Expand Up @@ -95,27 +95,20 @@ func TestQueryHandlerErrors(t *testing.T) {

//Prepare context objects for handler
requestContext := prepareRequestContext(request, Opts{}, t)
clientContext := setupChannelClientContext(errors.New(discoveryServiceError), nil, nil, t)

//Get query handler
queryHandler := NewQueryHandler()

//Perform action through handler
queryHandler.Handle(requestContext, clientContext)
if requestContext.Error == nil || !strings.Contains(requestContext.Error.Error(), discoveryServiceError) {
t.Fatal("Expected error: ", discoveryServiceError, ", Received error:", requestContext.Error.Error())
}

//Error Scenario 2
clientContext = setupChannelClientContext(nil, errors.New(selectionServiceError), nil, t)
//Error Scenario 1
clientContext := setupChannelClientContext(nil, errors.New(selectionServiceError), nil, t)

//Perform action through handler
queryHandler.Handle(requestContext, clientContext)
if requestContext.Error == nil || !strings.Contains(requestContext.Error.Error(), selectionServiceError) {
t.Fatal("Expected error: ", selectionServiceError, ", Received error:", requestContext.Error.Error())
}

//Error Scenario 3 different payload return
//Error Scenario 2 different payload return
mockPeer1 := &fcmocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", Status: 200,
Payload: []byte("value")}
mockPeer2 := &fcmocks.MockPeer{MockName: "Peer2", MockURL: "http://peer2.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", Status: 200,
Expand Down Expand Up @@ -165,9 +158,18 @@ func TestEndorsementHandler(t *testing.T) {
assert.Nil(t, requestContext.Error)
}

// Target filter
type filter struct {
peer fab.Peer
}

func (f *filter) Accept(p fab.Peer) bool {
return p.URL() == f.peer.URL()
}

func TestProposalProcessorHandler(t *testing.T) {
peer1 := fcmocks.NewMockPeer("p1", "")
peer2 := fcmocks.NewMockPeer("p2", "")
peer1 := fcmocks.NewMockPeer("p1", "peer1:7051")
peer2 := fcmocks.NewMockPeer("p2", "peer2:7051")
discoveryPeers := []fab.Peer{peer1, peer2}

//Get query handler
Expand Down Expand Up @@ -206,6 +208,18 @@ func TestProposalProcessorHandler(t *testing.T) {
if requestContext.Opts.Targets[0] != peer2 {
t.Fatalf("Didn't get expected peers")
}

requestContext = prepareRequestContext(request, Opts{TargetFilter: &filter{peer: peer2}}, t)
handler.Handle(requestContext, setupChannelClientContext(nil, nil, discoveryPeers, t))
if requestContext.Error != nil {
t.Fatalf("Got error: %s", requestContext.Error)
}
if len(requestContext.Opts.Targets) != 1 {
t.Fatalf("Expecting 1 proposal processor but got %d", len(requestContext.Opts.Targets))
}
if requestContext.Opts.Targets[0] != peer2 {
t.Fatalf("Didn't get expected peers")
}
}

//prepareHandlerContexts prepares context objects for handlers
Expand All @@ -218,6 +232,11 @@ func prepareRequestContext(request Request, opts Opts, t *testing.T) *RequestCon

requestContext.Opts.Timeouts = make(map[core.TimeoutType]time.Duration)
requestContext.Opts.Timeouts[core.Execute] = testTimeOut
if opts.TargetFilter != nil {
requestContext.SelectionFilter = func(peer fab.Peer) bool {
return opts.TargetFilter.Accept(peer)
}
}

return requestContext
}
Expand Down Expand Up @@ -264,7 +283,6 @@ func setupTestDiscovery(discErr error, peers []fab.Peer) (fab.DiscoveryService,
if err != nil {
return nil, errors.WithMessage(err, "NewMockDiscoveryProvider failed")
}

return mockDiscovery.CreateDiscoveryService("mychannel")
}

Expand Down

0 comments on commit 523348b

Please sign in to comment.