Skip to content

Commit

Permalink
FABGW-6 endpoint factory to use refactored comms
Browse files Browse the repository at this point in the history
Refactored the endpoint factory code in the gateway to better use the refactored comms package.  Allows the code to be unit tested with fairly good coverage.
Resolved the final TODO on configurable connection timeout

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman authored and sykesm committed Mar 10, 2021
1 parent e71b99a commit 2d81cb2
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 135 deletions.
2 changes: 2 additions & 0 deletions core/peer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func TestGlobalConfig(t *testing.T) {
viper.Set("peer.validatorPoolSize", 1)
viper.Set("peer.gateway.enabled", true)
viper.Set("peer.gateway.endorsementTimeout", 10*time.Second)
viper.Set("peer.gateway.dialTimeout", 60*time.Second)

viper.Set("vm.endpoint", "unix:///var/run/docker.sock")
viper.Set("vm.docker.tls.enabled", false)
Expand Down Expand Up @@ -375,6 +376,7 @@ func TestGlobalConfig(t *testing.T) {
GatewayOptions: gateway.Options{
Enabled: true,
EndorsementTimeout: 10 * time.Second,
DialTimeout: 60 * time.Second,
},
}

Expand Down
14 changes: 7 additions & 7 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (gs *Server) Evaluate(ctx context.Context, proposedTransaction *gp.Proposed
if len(endorsers) == 0 {
return nil, fmt.Errorf("no endorsing peers found for channel: %s", proposedTransaction.ChannelId)
}
response, err := endorsers[0].ProcessProposal(ctx, signedProposal) // TODO choose suitable peer based on block height, etc (future user story)
response, err := endorsers[0].client.ProcessProposal(ctx, signedProposal) // TODO choose suitable peer based on block height, etc (future user story)
if err != nil {
return nil, fmt.Errorf("failed to evaluate transaction: %w", err)
}
Expand Down Expand Up @@ -77,13 +77,13 @@ func (gs *Server) Endorse(ctx context.Context, proposedTransaction *gp.ProposedT
var wg sync.WaitGroup
responseCh := make(chan *endorserResponse, len(endorsers))
// send to all the endorsers
for _, endorser := range endorsers {
for _, e := range endorsers {
wg.Add(1)
go func(endorser peer.EndorserClient) {
go func(endorser *endorser) {
defer wg.Done()
response, err := endorser.ProcessProposal(ctx, signedProposal)
response, err := endorser.client.ProcessProposal(ctx, signedProposal)
responseCh <- &endorserResponse{pr: response, err: err}
}(endorser)
}(e)
}
wg.Wait()
close(responseCh)
Expand Down Expand Up @@ -136,11 +136,11 @@ func (gs *Server) Submit(txn *gp.PreparedTransaction, cs gp.Gateway_SubmitServer

// send to first orderer for now
logger.Info("Submitting txn to orderer")
if err := orderers[0].Send(txn.Envelope); err != nil {
if err := orderers[0].client.Send(txn.Envelope); err != nil {
return fmt.Errorf("failed to send envelope to orderer: %w", err)
}

response, err := orderers[0].Recv()
response, err := orderers[0].client.Recv()
if err != nil {
return err
}
Expand Down
110 changes: 69 additions & 41 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"testing"
"time"

"github.com/spf13/viper"

"github.com/golang/protobuf/proto"
cp "github.com/hyperledger/fabric-protos-go/common"
dp "github.com/hyperledger/fabric-protos-go/discovery"
Expand All @@ -29,7 +27,9 @@ import (
"github.com/hyperledger/fabric/internal/pkg/gateway/mocks"
idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks"
"github.com/hyperledger/fabric/protoutil"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

// The following private interfaces are here purely to prevent counterfeiter creating an import cycle in the unit test
Expand All @@ -48,8 +48,8 @@ type submitServer interface {
pb.Gateway_SubmitServer
}

//go:generate counterfeiter -o mocks/orderer.go --fake-name Orderer . orderer
type orderer interface {
//go:generate counterfeiter -o mocks/abclient.go --fake-name ABClient . abClient
type abClient interface {
ab.AtomicBroadcast_BroadcastClient
}

Expand Down Expand Up @@ -129,12 +129,7 @@ func TestGateway(t *testing.T) {

server := CreateServer(localEndorser, disc, "localhost:7051", options)

factory := &endpointFactory{
t: t,
proposalResponse: "mock_response",
}
server.registry.endorserFactory = factory.mockEndorserFactory
server.registry.ordererFactory = factory.mockOrdererFactory
server.registry.endpointFactory = createEndpointFactory(t, "mock_response", "mock_orderer_response")

require.NoError(t, err, "Failed to sign the proposal")
ctx := context.WithValue(context.Background(), contextKey("orange"), "apples")
Expand Down Expand Up @@ -187,6 +182,38 @@ func TestGateway(t *testing.T) {
processProposalError: fmt.Errorf("mumbo-jumbo"),
errString: "mumbo-jumbo",
},
{
name: "dialing endorser endpoint fails",
plan: endorsementPlan{
"g1": {"peer2:9051"},
},
signedProposal: validSignedProposal,
setupRegistry: func(reg *registry) {
reg.endpointFactory.dialer = func(_ context.Context, target string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
if target == "peer2:9051" {
return nil, fmt.Errorf("endorser not answering")
}
return nil, nil
}
},
errString: "failed to create new connection: endorser not answering",
},
{
name: "dialing orderer endpoint fails",
plan: endorsementPlan{
"g1": {"peer2:9051"},
},
signedProposal: validSignedProposal,
setupRegistry: func(reg *registry) {
reg.endpointFactory.dialer = func(_ context.Context, target string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
if target == "orderer:7050" {
return nil, fmt.Errorf("orderer not answering")
}
return nil, nil
}
},
errString: "failed to create new connection: orderer not answering",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -395,10 +422,10 @@ func TestGateway(t *testing.T) {
},
signedProposal: validSignedProposal,
setupRegistry: func(reg *registry) {
reg.ordererFactory = func(address string, tlsRootCerts [][]byte) (ab.AtomicBroadcast_BroadcastClient, error) {
orderer := &mocks.Orderer{}
orderer.SendReturns(fmt.Errorf("Orderer says no!"))
return orderer, nil
reg.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
abc := &mocks.ABClient{}
abc.SendReturns(fmt.Errorf("Orderer says no!"))
return abc, nil
}
},
errString: "failed to send envelope to orderer: Orderer says no!",
Expand All @@ -410,10 +437,10 @@ func TestGateway(t *testing.T) {
},
signedProposal: validSignedProposal,
setupRegistry: func(reg *registry) {
reg.ordererFactory = func(address string, tlsRootCerts [][]byte) (ab.AtomicBroadcast_BroadcastClient, error) {
orderer := &mocks.Orderer{}
orderer.RecvReturns(nil, fmt.Errorf("Orderer not happy!"))
return orderer, nil
reg.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
abc := &mocks.ABClient{}
abc.RecvReturns(nil, fmt.Errorf("Orderer not happy!"))
return abc, nil
}
},
errString: "Orderer not happy!",
Expand All @@ -425,10 +452,10 @@ func TestGateway(t *testing.T) {
},
signedProposal: validSignedProposal,
setupRegistry: func(reg *registry) {
reg.ordererFactory = func(address string, tlsRootCerts [][]byte) (ab.AtomicBroadcast_BroadcastClient, error) {
orderer := &mocks.Orderer{}
orderer.RecvReturns(nil, nil)
return orderer, nil
reg.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
abc := &mocks.ABClient{}
abc.RecvReturns(nil, nil)
return abc, nil
}
},
errString: "received nil response from orderer",
Expand Down Expand Up @@ -539,25 +566,26 @@ func createMockPeer(t *testing.T, name string) *dp.Peer {
}
}

type endpointFactory struct {
t *testing.T
proposalResponse string
broadcastResponse string
}

func (ef *endpointFactory) mockEndorserFactory(address string, tlsRootCerts [][]byte) (peer.EndorserClient, error) {
endorser := &mocks.EndorserClient{}
endorser.ProcessProposalReturns(createProposalResponse(ef.t, ef.proposalResponse), nil)
return endorser, nil
}

func (ef *endpointFactory) mockOrdererFactory(address string, tlsRootCerts [][]byte) (ab.AtomicBroadcast_BroadcastClient, error) {
orderer := &mocks.Orderer{}
orderer.RecvReturns(&ab.BroadcastResponse{
Info: ef.broadcastResponse,
Status: 200,
}, nil)
return orderer, nil
func createEndpointFactory(t *testing.T, proposalResponse string, broadcastResponse string) *endpointFactory {
return &endpointFactory{
timeout: 5 * time.Second,
connectEndorser: func(_ *grpc.ClientConn) peer.EndorserClient {
e := &mocks.EndorserClient{}
e.ProcessProposalReturns(createProposalResponse(t, proposalResponse), nil)
return e
},
connectOrderer: func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
abc := &mocks.ABClient{}
abc.RecvReturns(&ab.BroadcastResponse{
Info: broadcastResponse,
Status: 200,
}, nil)
return abc, nil
},
dialer: func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, nil
},
}
}

func createProposal(t *testing.T, channel string, chaincode string, args ...[]byte) *peer.Proposal {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/gateway/apiutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getValueFromResponse(response *peer.ProposalResponse) (*gateway.Result, err
}

if extension != nil && extension.Response != nil {
if extension.Response.Status > 200 {
if extension.Response.Status < 200 || extension.Response.Status >= 400 {
return nil, fmt.Errorf("error %d, %s", extension.Response.Status, extension.Response.Message)
}
retVal = extension.Response.Payload
Expand Down
13 changes: 10 additions & 3 deletions internal/pkg/gateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
"github.com/spf13/viper"
)

// GatewayOptions is used to configure the gateway settings
// GatewayOptions is used to configure the gateway settings.
type Options struct {
// GatewayEnabled is used to enable the gateway service
Enabled bool
// GatewayEnabled is used to enable the gateway service.
Enabled bool
// EndorsementTimeout is used to specify the maximum time to wait for endorsement responses from external peers.
EndorsementTimeout time.Duration
// DialTimeout is used to specify the maximum time to wait for connecting to external peers and orderer nodes.
DialTimeout time.Duration
}

var defaultOptions = Options{
Enabled: false,
EndorsementTimeout: 10 * time.Second,
DialTimeout: 30 * time.Second,
}

// DefaultOptions gets the default Gateway configuration Options
Expand All @@ -33,6 +37,9 @@ func GetOptions(v *viper.Viper) Options {
if v.IsSet("peer.gateway.endorsementTimeout") {
options.EndorsementTimeout = v.GetDuration("peer.gateway.endorsementTimeout")
}
if v.IsSet("peer.gateway.dialTimeout") {
options.DialTimeout = v.GetDuration("peer.gateway.dialTimeout")
}

return options
}
2 changes: 2 additions & 0 deletions internal/pkg/gateway/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ peer:
gateway:
enabled: true
endorsementTimeout: 30s
dialTimeout: 2m
`)

func TestDefaultOptions(t *testing.T) {
Expand All @@ -37,6 +38,7 @@ func TestOverriddenOptions(t *testing.T) {
expectedOptions := Options{
Enabled: true,
EndorsementTimeout: 30 * time.Second,
DialTimeout: 2 * time.Minute,
}
require.Equal(t, expectedOptions, options)
}
Loading

0 comments on commit 2d81cb2

Please sign in to comment.