Skip to content

Commit

Permalink
[FAB-8330] grpcs fallback to grpc when failed
Browse files Browse the repository at this point in the history
For Orderer, Peer, Events when protocol is not mentioned
in the URL,

 - GRPCS will be tried first
 - If connection fails, it will fall back to GRPC when
   'allow-insecure=true'




Change-Id: I1421fa4f60702a7ea1980ad4dfe600d08d752c43
Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Feb 16, 2018
1 parent 614551a commit 89112b8
Show file tree
Hide file tree
Showing 24 changed files with 460 additions and 174 deletions.
2 changes: 1 addition & 1 deletion api/apifabclient/event.go
Expand Up @@ -16,7 +16,7 @@ import (

// EventHub ...
type EventHub interface {
SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string)
SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string, allowInsecure bool)
IsConnected() bool
Connect() error
Disconnect() error
Expand Down
8 changes: 6 additions & 2 deletions pkg/config/comm/comm.go
Expand Up @@ -19,10 +19,14 @@ import (
// TLSConfig returns the appropriate config for TLS including the root CAs,
// certs for mutual TLS, and server host override. Works with certs loaded either from a path or embedded pem.
func TLSConfig(cert *x509.Certificate, serverName string, config apiconfig.Config) (*tls.Config, error) {
certPool, _ := config.TLSCACertPool()
certPool, err := config.TLSCACertPool()
if err != nil {
return nil, err
}

if cert == nil && (certPool == nil || len(certPool.Subjects()) == 0) {
return nil, errors.New("certificate is required")
//Return empty tls config if there is no cert provided or if certpool unavailable
return &tls.Config{}, nil
}

tlsCaCertPool, err := config.TLSCACertPool(cert)
Expand Down
24 changes: 0 additions & 24 deletions pkg/config/comm/comm_test.go
Expand Up @@ -8,7 +8,6 @@ package comm

import (
"bytes"
"crypto/x509"
"encoding/hex"
"testing"

Expand All @@ -22,29 +21,6 @@ import (
"github.com/hyperledger/fabric-sdk-go/api/apiconfig/mocks"
)

func TestTLSConfigEmptyCertPoolAndCertificate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
config := mock_apiconfig.NewMockConfig(mockCtrl)

// nil cert pool
config.EXPECT().TLSCACertPool().Return(nil, nil)

_, err := TLSConfig(nil, "", config)
if err == nil {
t.Fatal("Expected failure with nil cert pool")
}

// empty cert pool
certPool := x509.NewCertPool()
config.EXPECT().TLSCACertPool().Return(certPool, nil)

_, err = TLSConfig(nil, "", config)
if err == nil {
t.Fatal("Expected failure with empty cert pool")
}
}

func TestTLSConfigErrorAddingCertificate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/testdata/config_test_pem.yaml
Expand Up @@ -239,6 +239,7 @@ orderers:
grpcOptions:
ssl-target-name-override: orderer.example.com
grpc-max-send-message-length: 15
allow-insecure: false

tlsCACerts:
# pem supersedes path
Expand Down Expand Up @@ -276,6 +277,7 @@ peers:
grpcOptions:
ssl-target-name-override: peer0.org1.example.com
grpc.http2.keepalive_time: 15
allow-insecure: false

tlsCACerts:
pem: |
Expand Down Expand Up @@ -303,6 +305,7 @@ peers:
eventUrl: grpcs://peer0.org2.example.com:7053
grpcOptions:
ssl-target-name-override: peer0.org2.example.com
allow-insecure: false
tlsCACerts:
pem:
path:
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/testdata/template/config.yaml
Expand Up @@ -209,6 +209,9 @@ orderers:
#fail-fast is action to take when an RPC is attempted on broken connections or unreachable servers
# fail-fast: true

# When no protocol provided in url, grpcs connection will be tried first, if failed it falls back to grpc when this option set to true
# allow-insecure: true

# tlsCACerts:
# Certificate location absolute path
# path: ${GOPATH}/src/github.com/hyperledger/fabric-sdk-go/test/fixtures/channel/crypto-config/ordererOrganizations/example.com/tlsca/tlsca.example.com-cert.pem
Expand All @@ -228,6 +231,8 @@ peers:
# grpcOptions:
# ssl-target-name-override: peer0.org1.example.com
# grpc.http2.keepalive_time: 15
# When no protocol provided in url, grpcs connection will be tried first, if failed it falls back to grpc when this option set to true
# allow-insecure: true

# tlsCACerts:
# Certificate location absolute path
Expand Down
22 changes: 19 additions & 3 deletions pkg/config/urlutil/urlutils.go
Expand Up @@ -9,6 +9,8 @@ package urlutil
import (
"strings"

"regexp"

"github.com/hyperledger/fabric-sdk-go/pkg/logging"
)

Expand All @@ -33,8 +35,22 @@ func ToAddress(url string) string {
if strings.HasPrefix(url, "grpcs://") {
return strings.TrimPrefix(url, "grpcs://")
}
if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") {
logger.Warnf("URL '%s' has no prefix. Please enter a prefix as it will be mandatory in a future release", url)
}
return url
}

//AttemptSecured is a utility function which verifies URL and returns if secured connections needs to established
func AttemptSecured(url string) bool {
ok, err := regexp.MatchString(".*(?i)s://", url)
if ok && err == nil {
return true
} else if !strings.Contains(url, "://") {
return true
} else {
return false
}
}

//HasProtocol is a utility function which verifies if protocol is provided in URL
func HasProtocol(url string) bool {
return strings.Contains(url, "://")
}
11 changes: 1 addition & 10 deletions pkg/fabric-client/channel/channel.go
Expand Up @@ -9,7 +9,6 @@ package channel
import (
"crypto/x509"
"encoding/pem"
"regexp"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -91,7 +90,7 @@ func New(ctx fab.Context, cfg fab.ChannelCfg) (*Channel, error) {

var o *orderer.Orderer
if oCfg == nil {
o, err = orderer.New(ctx.Config(), orderer.WithURL(resolveOrdererURL(name)), orderer.WithServerName(resolveOrdererAddress(name)))
o, err = orderer.New(ctx.Config(), orderer.WithURL(name), orderer.WithServerName(resolveOrdererAddress(name)))
} else {
o, err = orderer.New(ctx.Config(), orderer.FromOrdererConfig(oCfg))
}
Expand Down Expand Up @@ -319,14 +318,6 @@ func resolveOrdererAddress(ordererAddress string) string {
return ordererAddress
}

// resolveOrdererURL resolves order URL to prefix protocol if not present
func resolveOrdererURL(ordererURL string) string {
if ok, err := regexp.MatchString(".*://", ordererURL); ok && err == nil {
return ordererURL
}
return "grpcs://" + ordererURL
}

// QueryInfo queries for various useful information on the state of the channel
// (height, known peers).
// This query will be made to the primary peer.
Expand Down
25 changes: 21 additions & 4 deletions pkg/fabric-client/events/consumer/consumer.go
Expand Up @@ -53,12 +53,15 @@ type eventsClient struct {
processEventsCompleted chan struct{}
kap keepalive.ClientParameters
failFast bool
secured bool
allowInsecure bool
}

//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
func NewEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate,
serverhostoverride string, regTimeout time.Duration, adapter consumer.EventAdapter,
kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) {

var err error
if regTimeout < 100*time.Millisecond {
regTimeout = 100 * time.Millisecond
Expand All @@ -80,14 +83,17 @@ func NewEventsClient(provider fab.ProviderContext, identity fab.IdentityContext,
tlsCertHash: ccomm.TLSCertHash(provider.Config()),
kap: kap,
failFast: failFast,
secured: urlutil.AttemptSecured(peerAddress),
allowInsecure: allowInsecure,
}, err
}

//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
func newEventsClientConnectionWithAddress(peerAddress string, cert *x509.Certificate, serverHostOverride string,
config apiconfig.Config, kap keepalive.ClientParameters, failFast bool) (*grpc.ClientConn, error) {
config apiconfig.Config, kap keepalive.ClientParameters, failFast bool, secured bool) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
if urlutil.IsTLSEnabled(peerAddress) {
opts = append(opts, grpc.WithTimeout(config.TimeoutOrDefault(apiconfig.EventHub)))
if secured {
tlsConfig, err := comm.TLSConfig(cert, serverHostOverride, config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -301,8 +307,13 @@ func (ec *eventsClient) processEvents() error {

//Start establishes connection with Event hub and registers interested events with it
func (ec *eventsClient) Start() error {
return ec.establishConnectionAndRegister(ec.secured)
}

func (ec *eventsClient) establishConnectionAndRegister(secured bool) error {
conn, err := newEventsClientConnectionWithAddress(ec.peerAddress, ec.TLSCertificate, ec.TLSServerHostOverride,
ec.provider.Config(), ec.kap, ec.failFast)
ec.provider.Config(), ec.kap, ec.failFast, secured)

if err != nil {
return errors.WithMessage(err, "events connection failed")
}
Expand All @@ -320,6 +331,12 @@ func (ec *eventsClient) Start() error {
serverClient := ehpb.NewEventsClient(conn)
ec.stream, err = serverClient.Chat(context.Background())
if err != nil {
logger.Error("events connection failed, cause: ", err)
if secured && ec.allowInsecure {
//If secured mode failed and allow insecure is enabled then retry in insecure mode
logger.Debug("Secured establishConnectionAndRegister failed, attempting insecured")
return ec.establishConnectionAndRegister(false)
}
return errors.Wrap(err, "events connection failed")
}

Expand Down
33 changes: 24 additions & 9 deletions pkg/fabric-client/events/eventhub.go
Expand Up @@ -60,23 +60,24 @@ type EventHub struct {
// Factory that creates EventsClient
eventsClientFactory eventClientFactory
// FabricClient
provider fab.ProviderContext
identity fab.IdentityContext
kap keepalive.ClientParameters
failFast bool
provider fab.ProviderContext
identity fab.IdentityContext
kap keepalive.ClientParameters
failFast bool
allowInsecure bool
}

// eventClientFactory creates an EventsClient instance
type eventClientFactory interface {
newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error)
newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error)
}

// consumerClientFactory is the default implementation oif the eventClientFactory
type consumerClientFactory struct{}

func (ccf *consumerClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string,
regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
return consumer.NewEventsClient(provider, identity, peerAddress, certificate, serverHostOverride, regTimeout, adapter, kap, failFast)
regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) {
return consumer.NewEventsClient(provider, identity, peerAddress, certificate, serverHostOverride, regTimeout, adapter, kap, failFast, allowInsecure)
}

// Context holds the providers and services needed to create an EventHub.
Expand Down Expand Up @@ -126,6 +127,8 @@ func FromConfig(ctx Context, peerCfg *apiconfig.PeerConfig) (*EventHub, error) {
eventHub.peerTLSServerHostOverride = serverHostOverride
eventHub.kap = getKeepAliveOptions(peerCfg)
eventHub.failFast = getFailFast(peerCfg)
eventHub.allowInsecure = isInsecureConnectionAllowed(peerCfg)

return eventHub, nil
}

Expand Down Expand Up @@ -278,10 +281,12 @@ func (eventHub *EventHub) removeChaincodeInterest(ChaincodeID string, EventName
// peeraddr peer url
// peerTLSCertificate peer tls certificate
// peerTLSServerHostOverride tls serverhostoverride
func (eventHub *EventHub) SetPeerAddr(peerURL string, peerTLSCertificate *x509.Certificate, peerTLSServerHostOverride string) {
// inSecure option enables grpc retry when grpcs fails (only when no protocol provided in peerURL)
func (eventHub *EventHub) SetPeerAddr(peerURL string, peerTLSCertificate *x509.Certificate, peerTLSServerHostOverride string, allowInsecure bool) {
eventHub.peerAddr = peerURL
eventHub.peerTLSCertificate = peerTLSCertificate
eventHub.peerTLSServerHostOverride = peerTLSServerHostOverride
eventHub.allowInsecure = allowInsecure && !urlutil.HasProtocol(peerURL)
}

// IsConnected gets connected state of eventhub
Expand Down Expand Up @@ -312,7 +317,7 @@ func (eventHub *EventHub) Connect() error {
if eventHub.grpcClient == nil {
eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.provider, eventHub.identity,
eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride,
eventHub.provider.Config().TimeoutOrDefault(apiconfig.EventReg), eventHub, eventHub.kap, eventHub.failFast)
eventHub.provider.Config().TimeoutOrDefault(apiconfig.EventReg), eventHub, eventHub.kap, eventHub.failFast, eventHub.allowInsecure)
eventHub.grpcClient = eventsClient
}

Expand Down Expand Up @@ -602,3 +607,13 @@ func (eventHub *EventHub) notifyChaincodeRegistrants(channelID string, ccEvent *
}
}
}

func isInsecureConnectionAllowed(peerCfg *apiconfig.PeerConfig) bool {
//allowInsecure used only when protocol is missing from URL
allowInsecure := !urlutil.HasProtocol(peerCfg.URL)
boolVal, ok := peerCfg.GRPCOptions["allow-insecure"].(bool)
if ok {
return allowInsecure && boolVal
}
return false
}
4 changes: 2 additions & 2 deletions pkg/fabric-client/events/eventmocks.go
Expand Up @@ -45,7 +45,7 @@ type mockEventClientFactory struct {
}

func (mecf *mockEventClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration,
adapter fcConsumer.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
adapter fcConsumer.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) {
mec := &mockEventClient{
PeerAddress: peerAddress,
RegTimeout: regTimeout,
Expand Down Expand Up @@ -121,7 +121,7 @@ func createMockedEventHub() (*EventHub, *mockEventClientFactory, error) {
var clientFactory mockEventClientFactory
eventHub.eventsClientFactory = &clientFactory

eventHub.SetPeerAddr("mock://mock", nil, "")
eventHub.SetPeerAddr("mock://mock", nil, "", true)

err = eventHub.Connect()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/fabric-client/mocks/mockeventhub.go
Expand Up @@ -26,7 +26,7 @@ func NewMockEventHub() *MockEventHub {
}

// SetPeerAddr not implemented
func (m *MockEventHub) SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string) {
func (m *MockEventHub) SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string, allowInsecure bool) {
// Not implemented
}

Expand Down

0 comments on commit 89112b8

Please sign in to comment.