Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FABGW-6 Use concurrent go routines for endorsement #2453

Merged
merged 1 commit into from Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/peer/config.go
Expand Up @@ -273,8 +273,7 @@ func (c *Config) load() error {
c.DeliverClientKeepaliveOptions.ClientTimeout = viper.GetDuration("peer.keepalive.deliveryClient.timeout")
}

c.GatewayOptions = gateway.DefaultOptions()
c.GatewayOptions.Enabled = viper.GetBool("peer.gateway.enabled")
c.GatewayOptions = gateway.GetOptions(viper.GetViper())

c.VMEndpoint = viper.GetString("vm.endpoint")
c.VMDockerTLSEnabled = viper.GetBool("vm.docker.tls.enabled")
Expand Down
6 changes: 5 additions & 1 deletion core/peer/config_test.go
Expand Up @@ -276,6 +276,7 @@ func TestGlobalConfig(t *testing.T) {
viper.Set("peer.chaincodeAddress", "0.0.0.0:7052")
viper.Set("peer.validatorPoolSize", 1)
viper.Set("peer.gateway.enabled", true)
viper.Set("peer.gateway.endorsementTimeout", 10*time.Second)

viper.Set("vm.endpoint", "unix:///var/run/docker.sock")
viper.Set("vm.docker.tls.enabled", false)
Expand Down Expand Up @@ -372,7 +373,8 @@ func TestGlobalConfig(t *testing.T) {
DockerCA: filepath.Join(cwd, "test/vm/tls/ca/file"),

GatewayOptions: gateway.Options{
Enabled: true,
Enabled: true,
EndorsementTimeout: 10 * time.Second,
},
}

Expand All @@ -392,6 +394,7 @@ func TestGlobalConfigDefault(t *testing.T) {
ValidatorPoolSize: runtime.NumCPU(),
VMNetworkMode: "host",
DeliverClientKeepaliveOptions: comm.DefaultKeepaliveOptions,
GatewayOptions: gateway.GetOptions(viper.GetViper()),
}

require.Equal(t, expectedConfig, coreConfig)
Expand Down Expand Up @@ -446,6 +449,7 @@ func TestPropagateEnvironment(t *testing.T) {
Path: "/testPath",
},
},
GatewayOptions: gateway.GetOptions(viper.GetViper()),
}
require.Equal(t, expectedConfig, coreConfig)
}
Expand Down
7 changes: 6 additions & 1 deletion internal/peer/node/start.go
Expand Up @@ -825,7 +825,12 @@ func serve(args []string) error {
logger.Info("Starting peer with Gateway enabled")
gatewayprotos.RegisterGatewayServer(
peerServer.Server(),
gateway.CreateServer(&gateway.EndorserServerAdapter{Server: serverEndorser}, discoveryService, peerInstance.GossipService.SelfMembershipInfo().Endpoint),
gateway.CreateServer(
&gateway.EndorserServerAdapter{Server: serverEndorser},
discoveryService,
peerInstance.GossipService.SelfMembershipInfo().Endpoint,
coreConfig.GatewayOptions,
),
)
} else {
logger.Warning("Discovery service must be enabled for embedded gateway")
Expand Down
36 changes: 29 additions & 7 deletions internal/pkg/gateway/api.go
Expand Up @@ -9,12 +9,18 @@ package gateway
import (
"context"
"fmt"
"sync"

gp "github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/protoutil"
)

type endorserResponse struct {
pr *peer.ProposalResponse
err error
}

// Evaluate will invoke the transaction function as specified in the SignedProposal
func (gs *Server) Evaluate(ctx context.Context, proposedTransaction *gp.ProposedTransaction) (*gp.Result, error) {
if proposedTransaction == nil {
Expand Down Expand Up @@ -65,17 +71,33 @@ func (gs *Server) Endorse(ctx context.Context, proposedTransaction *gp.ProposedT
return nil, err
}

var responses []*peer.ProposalResponse
// send to all the endorsers - TODO fan out in parallel
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

var wg sync.WaitGroup
responseCh := make(chan *endorserResponse, len(endorsers))
// send to all the endorsers
for _, endorser := range endorsers {
response, err := endorser.ProcessProposal(ctx, signedProposal)
if err != nil {
return nil, fmt.Errorf("failed to process proposal: %w", err)
wg.Add(1)
go func(endorser peer.EndorserClient) {
defer wg.Done()
response, err := endorser.ProcessProposal(ctx, signedProposal)
responseCh <- &endorserResponse{pr: response, err: err}
}(endorser)
}
wg.Wait()
close(responseCh)

var responses []*peer.ProposalResponse
for response := range responseCh {
if response.err != nil {
// TODO: we should retry, or attempt a different endorsement layout (if available)
return nil, fmt.Errorf("failed to process proposal: %w", response.err)
}
responses = append(responses, response)
responses = append(responses, response.pr)
}

env, err := createUnsignedTx(proposal, responses...)
env, err := protoutil.CreateTx(proposal, responses...)
if err != nil {
return nil, fmt.Errorf("failed to assemble transaction: %w", err)
}
Expand Down
19 changes: 16 additions & 3 deletions internal/pkg/gateway/api_test.go
Expand Up @@ -10,6 +10,9 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/spf13/viper"

"github.com/golang/protobuf/proto"
cp "github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -74,6 +77,7 @@ type contextKey string
func TestGateway(t *testing.T) {
const testChannel = "test_channel"
const testChaincode = "test_chaincode"
const endorsementTimeout = -1 * time.Second

mockSigner := &idmocks.SignerSerializer{}
mockSigner.SignReturns([]byte("my_signature"), nil)
Expand Down Expand Up @@ -118,7 +122,12 @@ func TestGateway(t *testing.T) {
tt.setupDiscovery(disc)
}

server := CreateServer(localEndorser, disc, "localhost:7051")
options := Options{
Enabled: true,
EndorsementTimeout: endorsementTimeout,
}

server := CreateServer(localEndorser, disc, "localhost:7051", options)

factory := &endpointFactory{
t: t,
Expand Down Expand Up @@ -306,7 +315,11 @@ func TestGateway(t *testing.T) {
require.Equal(t, 1, localEndorser.ProcessProposalCallCount())
ectx, prop, _ := localEndorser.ProcessProposalArgsForCall(0)
require.Equal(t, tt.signedProposal, prop)
require.Same(t, ctx, ectx)
require.Equal(t, "apples", ectx.Value(contextKey("orange")))
// context timeout was set to -1s, so deadline should be in the past
deadline, ok := ectx.Deadline()
require.True(t, ok)
require.Negative(t, time.Until(deadline))

require.Equal(t, testChannel, preparedTxn.ChannelId)
// check the prepare transaction (Envelope) contains the right number of endorsements
Expand Down Expand Up @@ -449,7 +462,7 @@ func TestGateway(t *testing.T) {
}

func TestNilArgs(t *testing.T) {
server := CreateServer(&mocks.EndorserClient{}, &mocks.Discovery{}, "localhost:7051")
server := CreateServer(&mocks.EndorserClient{}, &mocks.Discovery{}, "localhost:7051", GetOptions(viper.New()))
ctx := context.Background()

_, err := server.Evaluate(ctx, nil)
Expand Down
38 changes: 0 additions & 38 deletions internal/pkg/gateway/apiutils.go
Expand Up @@ -9,49 +9,11 @@ package gateway
import (
"fmt"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/protoutil"
)

type nilSigner struct {
creator []byte
}

func (s *nilSigner) Sign([]byte) ([]byte, error) {
return nil, nil
}

func (s *nilSigner) Serialize() ([]byte, error) {
return s.creator, nil
}

func createUnsignedTx(
proposal *peer.Proposal,
resps ...*peer.ProposalResponse,
) (*common.Envelope, error) {
// extract the Creator from the signature header
hdr, err := protoutil.UnmarshalHeader(proposal.Header)
if err != nil {
return nil, err
}
shdr, err := protoutil.UnmarshalSignatureHeader(hdr.SignatureHeader)
if err != nil {
return nil, err
}

// TODO the creation of this dummy signer containing the serialised creator from the Proposal
// is required because protoutil.CreateSignedTx contains a check that they match.
// However, there is a comment there about removing that check. If removed, the code could be
// refactored to remove the need for this kludge.
dummySigner := &nilSigner{
creator: shdr.Creator,
}

return protoutil.CreateSignedTx(proposal, dummySigner, resps...)
}

func getValueFromResponse(response *peer.ProposalResponse) (*gateway.Result, error) {
var retVal []byte

Expand Down
26 changes: 22 additions & 4 deletions internal/pkg/gateway/config.go
Expand Up @@ -6,15 +6,33 @@ SPDX-License-Identifier: Apache-2.0

package gateway

import (
"time"

"github.com/spf13/viper"
)

// GatewayOptions is used to configure the gateway settings
type Options struct {
// GatewayEnabled is used to enable the gateway service
Enabled bool
Enabled bool
EndorsementTimeout time.Duration
}

var defaultOptions = Options{
Enabled: false,
EndorsementTimeout: 10 * time.Second,
}

// DefaultOptions gets the default Gateway configuration Options
func DefaultOptions() Options {
return Options{
Enabled: false,
func GetOptions(v *viper.Viper) Options {
options := defaultOptions
if v.IsSet("peer.gateway.enabled") {
options.Enabled = v.GetBool("peer.gateway.enabled")
}
if v.IsSet("peer.gateway.endorsementTimeout") {
options.EndorsementTimeout = v.GetDuration("peer.gateway.endorsementTimeout")
}

return options
}
42 changes: 42 additions & 0 deletions internal/pkg/gateway/config_test.go
@@ -0,0 +1,42 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package gateway

import (
"bytes"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)

var testConfig = []byte(`
peer:
gateway:
enabled: true
endorsementTimeout: 30s
`)

func TestDefaultOptions(t *testing.T) {
v := viper.New()
options := GetOptions(v)
require.Equal(t, defaultOptions, options)
}

func TestOverriddenOptions(t *testing.T) {
v := viper.New()
v.SetConfigType("yaml")
v.ReadConfig(bytes.NewBuffer(testConfig))
options := GetOptions(v)

expectedOptions := Options{
Enabled: true,
EndorsementTimeout: 30 * time.Second,
}
require.Equal(t, expectedOptions, options)
}
4 changes: 3 additions & 1 deletion internal/pkg/gateway/gateway.go
Expand Up @@ -19,6 +19,7 @@ var logger = flogging.MustGetLogger("gateway")
// Server represents the GRPC server for the Gateway.
type Server struct {
registry *registry
options Options
}

type EndorserServerAdapter struct {
Expand All @@ -30,7 +31,7 @@ func (e *EndorserServerAdapter) ProcessProposal(ctx context.Context, req *peer.S
}

// CreateServer creates an embedded instance of the Gateway.
func CreateServer(localEndorser peer.EndorserClient, discovery Discovery, selfEndpoint string) *Server {
func CreateServer(localEndorser peer.EndorserClient, discovery Discovery, selfEndpoint string, options Options) *Server {
gwServer := &Server{
registry: &registry{
localEndorser: localEndorser,
Expand All @@ -44,6 +45,7 @@ func CreateServer(localEndorser peer.EndorserClient, discovery Discovery, selfEn
tlsRootCerts: map[string][][]byte{},
channelsInitialized: map[string]bool{},
},
options: options,
}

return gwServer
Expand Down