Skip to content

Commit

Permalink
Allow seperate TLS config for cluster and client (#393)
Browse files Browse the repository at this point in the history
When intra cluster communication is configured to use seperate
listener, allow TLS being disabled for the client-facing listener,
and assume TLS is always enabled for cluster listener (no extra
config option).

FAB-15648

This is cherry-pick from master

Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger authored and Jason Yellick committed Jan 6, 2020
1 parent d4dca95 commit a8639ba
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 77 deletions.
114 changes: 66 additions & 48 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,8 @@ func Start(cmd string, conf *localconfig.TopLevel) {
sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)

clusterType := isClusterType(clusterBootBlock)
signer := localmsp.NewSigner()

clusterClientConfig := initializeClusterClientConfig(conf, clusterType, bootstrapBlock)
clusterDialer := &cluster.PredicateDialer{
ClientConfig: clusterClientConfig,
}

r := createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}

logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.Global.SetObserver(logObserver)

Expand All @@ -135,27 +123,51 @@ func Start(cmd string, conf *localconfig.TopLevel) {
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}

var r *replicationInitiator
clusterServerConfig := serverConfig
clusterGRPCServer := grpcServer
clusterGRPCServer := grpcServer // by default, cluster shares the same grpc server
clusterClientConfig := comm.ClientConfig{SecOpts: &comm.SecureOptions{}, KaOpts: &comm.KeepaliveOptions{}}
var clusterDialer *cluster.PredicateDialer

var reuseGrpcListener bool
typ := consensusType(bootstrapBlock)
var serversToUpdate []*comm.GRPCServer

clusterType := isClusterType(clusterBootBlock)
if clusterType {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, grpcServer, ioutil.ReadFile)
logger.Infof("Setting up cluster for orderer type %s", typ)

clusterClientConfig = initializeClusterClientConfig(conf)
clusterDialer = &cluster.PredicateDialer{
ClientConfig: clusterClientConfig,
}

r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
// Only clusters that are equipped with a recent config block can replicate.
if conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}

if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
}

// If we have a separate gRPC server for the cluster,
// we need to update its TLS CA certificate pool.
serversToUpdate = append(serversToUpdate, clusterGRPCServer)
}

var servers = []*comm.GRPCServer{grpcServer}
// If we have a separate gRPC server for the cluster, we need to update its TLS
// CA certificate pool too.
if clusterGRPCServer != grpcServer {
servers = append(servers, clusterGRPCServer)
// if cluster is reusing client-facing server, then it is already
// appended to serversToUpdate at this point.
if grpcServer.MutualTLSRequired() && !reuseGrpcListener {
serversToUpdate = append(serversToUpdate, grpcServer)
}

tlsCallback := func(bundle *channelconfig.Bundle) {
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
if grpcServer.MutualTLSRequired() || clusterType {
logger.Debug("Executing callback to update root CAs")
updateTrustedRoots(caSupport, bundle, servers...)
if clusterType {
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
}
logger.Debug("Executing callback to update root CAs")
updateTrustedRoots(caSupport, bundle, serversToUpdate...)
if clusterType {
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
}
}

Expand Down Expand Up @@ -189,7 +201,7 @@ func Start(cmd string, conf *localconfig.TopLevel) {
},
}))

if clusterGRPCServer != grpcServer {
if !reuseGrpcListener && clusterType {
logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
go clusterGRPCServer.Start()
}
Expand All @@ -200,6 +212,29 @@ func Start(cmd string, conf *localconfig.TopLevel) {
grpcServer.Start()
}

func reuseListener(conf *localconfig.TopLevel, typ string) bool {
clusterConf := conf.General.Cluster
// If listen address is not configured, and the TLS certificate isn't configured,
// it means we use the general listener of the node.
if clusterConf.ListenPort == 0 && clusterConf.ServerCertificate == "" && clusterConf.ListenAddress == "" && clusterConf.ServerPrivateKey == "" {
logger.Info("Cluster listener is not configured, defaulting to use the general listener on port", conf.General.ListenPort)

if !conf.General.TLS.Enabled {
logger.Panicf("TLS is required for running ordering nodes of type %s.", typ)
}

return true
}

// Else, one of the above is defined, so all 4 properties should be defined.
if clusterConf.ListenPort == 0 || clusterConf.ServerCertificate == "" || clusterConf.ListenAddress == "" || clusterConf.ServerPrivateKey == "" {
logger.Panic("Options: General.Cluster.ListenPort, General.Cluster.ListenAddress, General.Cluster.ServerCertificate," +
" General.Cluster.ServerPrivateKey, should be defined altogether.")
}

return false
}

// Extract system channel last config block
func extractSysChanLastConfig(lf blockledger.Factory, bootstrapBlock *cb.Block) *cb.Block {
// Are we bootstrapping?
Expand Down Expand Up @@ -331,23 +366,9 @@ func handleSignals(handlers map[os.Signal]func()) {

type loadPEMFunc func(string) ([]byte, error)

// configureClusterListener gets a ServerConfig and a GRPCServer, and:
// 1) If the TopLevel configuration states that the cluster configuration for the cluster gRPC service is missing, returns them back.
// 2) Else, returns a new ServerConfig and a new gRPC server (with its own TLS listener on a different port).
func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.ServerConfig, generalSrv *comm.GRPCServer, loadPEM loadPEMFunc) (comm.ServerConfig, *comm.GRPCServer) {
// configureClusterListener returns a new ServerConfig and a new gRPC server (with its own TLS listener).
func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.ServerConfig, loadPEM loadPEMFunc) (comm.ServerConfig, *comm.GRPCServer) {
clusterConf := conf.General.Cluster
// If listen address is not configured, or the TLS certificate isn't configured,
// it means we use the general listener of the node.
if clusterConf.ListenPort == 0 && clusterConf.ServerCertificate == "" && clusterConf.ListenAddress == "" && clusterConf.ServerPrivateKey == "" {
logger.Info("Cluster listener is not configured, defaulting to use the general listener on port", conf.General.ListenPort)
return generalConf, generalSrv
}

// Else, one of the above is defined, so all 4 properties should be defined.
if clusterConf.ListenPort == 0 || clusterConf.ServerCertificate == "" || clusterConf.ListenAddress == "" || clusterConf.ServerPrivateKey == "" {
logger.Panic("Options: General.Cluster.ListenPort, General.Cluster.ListenAddress, General.Cluster.ServerCertificate," +
" General.Cluster.ServerPrivateKey, should be defined altogether.")
}

cert, err := loadPEM(clusterConf.ServerCertificate)
if err != nil {
Expand Down Expand Up @@ -398,18 +419,15 @@ func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.Serve
return serverConf, srv
}

func initializeClusterClientConfig(conf *localconfig.TopLevel, clusterType bool, bootstrapBlock *cb.Block) comm.ClientConfig {
if clusterType && !conf.General.TLS.Enabled {
logger.Panicf("TLS is required for running ordering nodes of type %s.", consensusType(bootstrapBlock))
}
func initializeClusterClientConfig(conf *localconfig.TopLevel) comm.ClientConfig {
cc := comm.ClientConfig{
AsyncConnect: true,
KaOpts: comm.DefaultKeepaliveOptions,
Timeout: conf.General.Cluster.DialTimeout,
SecOpts: &comm.SecureOptions{},
}

if (!conf.General.TLS.Enabled) || conf.General.Cluster.ClientCertificate == "" {
if conf.General.Cluster.ClientCertificate == "" {
return cc
}

Expand Down
82 changes: 53 additions & 29 deletions orderer/common/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestInitializeServerConfig(t *testing.T) {
if tc.clusterCert == "" {
initializeServerConfig(conf, nil)
} else {
initializeClusterClientConfig(conf, false, nil)
initializeClusterClientConfig(conf)
}
},
)
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
}

clusterConf := initializeClusterClientConfig(conf, true, nil)
clusterConf := initializeClusterClientConfig(conf)
predDialer := &cluster.PredicateDialer{
ClientConfig: clusterConf,
}
Expand Down Expand Up @@ -526,30 +526,6 @@ func TestConfigureClusterListener(t *testing.T) {
expectedPanic string
expectedLogEntries []string
}{
{
name: "no separate listener",
shouldBeEqual: true,
generalConf: comm.ServerConfig{},
conf: &localconfig.TopLevel{},
generalSrv: &comm.GRPCServer{},
},
{
name: "partial configuration",
generalConf: comm.ServerConfig{},
conf: &localconfig.TopLevel{
General: localconfig.General{
Cluster: localconfig.Cluster{
ListenPort: 5000,
},
},
},
expectedPanic: "Options: General.Cluster.ListenPort, General.Cluster.ListenAddress, " +
"General.Cluster.ServerCertificate, General.Cluster.ServerPrivateKey, should be defined altogether.",
generalSrv: &comm.GRPCServer{},
expectedLogEntries: []string{"Options: General.Cluster.ListenPort, General.Cluster.ListenAddress, " +
"General.Cluster.ServerCertificate," +
" General.Cluster.ServerPrivateKey, should be defined altogether."},
},
{
name: "invalid certificate",
generalConf: comm.ServerConfig{},
Expand Down Expand Up @@ -641,18 +617,18 @@ func TestConfigureClusterListener(t *testing.T) {
} {
t.Run(testCase.name, func(t *testing.T) {
if testCase.shouldBeEqual {
conf, srv := configureClusterListener(testCase.conf, testCase.generalConf, testCase.generalSrv, loadPEM)
conf, srv := configureClusterListener(testCase.conf, testCase.generalConf, loadPEM)
assert.Equal(t, conf, testCase.generalConf)
assert.Equal(t, srv, testCase.generalSrv)
}

if testCase.expectedPanic != "" {
f := func() {
configureClusterListener(testCase.conf, testCase.generalConf, testCase.generalSrv, loadPEM)
configureClusterListener(testCase.conf, testCase.generalConf, loadPEM)
}
assert.Contains(t, panicMsg(f), testCase.expectedPanic)
} else {
configureClusterListener(testCase.conf, testCase.generalConf, testCase.generalSrv, loadPEM)
configureClusterListener(testCase.conf, testCase.generalConf, loadPEM)
}
// Ensure logged messages that are expected were all logged
var loggedMessages []string
Expand All @@ -665,6 +641,54 @@ func TestConfigureClusterListener(t *testing.T) {
}
}

func TestReuseListener(t *testing.T) {
t.Run("good to reuse", func(t *testing.T) {
top := &localconfig.TopLevel{General: localconfig.General{TLS: localconfig.TLS{Enabled: true}}}
require.True(t, reuseListener(top, "foo"))
})

t.Run("reuse tls disabled", func(t *testing.T) {
top := &localconfig.TopLevel{}
require.PanicsWithValue(
t,
"TLS is required for running ordering nodes of type foo.",
func() { reuseListener(top, "foo") },
)
})

t.Run("good not to reuse", func(t *testing.T) {
top := &localconfig.TopLevel{
General: localconfig.General{
Cluster: localconfig.Cluster{
ListenAddress: "127.0.0.1",
ListenPort: 5000,
ServerPrivateKey: "key",
ServerCertificate: "bad",
},
},
}
require.False(t, reuseListener(top, "foo"))
})

t.Run("partial config", func(t *testing.T) {
top := &localconfig.TopLevel{
General: localconfig.General{
Cluster: localconfig.Cluster{
ListenAddress: "127.0.0.1",
ListenPort: 5000,
ServerCertificate: "bad",
},
},
}
require.PanicsWithValue(
t,
"Options: General.Cluster.ListenPort, General.Cluster.ListenAddress,"+
" General.Cluster.ServerCertificate, General.Cluster.ServerPrivateKey, should be defined altogether.",
func() { reuseListener(top, "foo") },
)
})
}

func TestInitializeEtcdraftConsenter(t *testing.T) {
consenters := make(map[string]consensus.Consenter)
rlf := ramledger.New(10)
Expand Down

0 comments on commit a8639ba

Please sign in to comment.