Skip to content

Commit a90caeb

Browse files
committed
[FAB-12579] Separate TLS listener for intra-cluster
This change set, adds an option for a separate TLS listener for intra-cluster communication. Change-Id: I059e4d45ddeaf066017c758b83a3e7422783a403 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 910c769 commit a90caeb

File tree

5 files changed

+324
-23
lines changed

5 files changed

+324
-23
lines changed

orderer/common/localconfig/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,13 @@ type General struct {
5858
}
5959

6060
type Cluster struct {
61-
RootCAs []string
61+
ListenAddress string
62+
ListenPort uint16
63+
ServerCertificate string
64+
ServerPrivateKey string
6265
ClientCertificate string
6366
ClientPrivateKey string
67+
RootCAs []string
6468
DialTimeout time.Duration
6569
RPCTimeout time.Duration
6670
ReplicationBufferSize int

orderer/common/server/etcdraft_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ func TestSpawnEtcdRaft(t *testing.T) {
5858

5959
// Launch the OSN
6060
ordererProcess := launchOrderer(gt, cmd, orderer, tempDir, genesisBlockPath, fabricRootDir)
61+
defer ordererProcess.Kill()
62+
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Starting cluster listener on 127.0.0.1:5612"))
6163
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests"))
6264
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader"))
63-
ordererProcess.Kill()
6465
}
6566

6667
func launchOrderer(gt *GomegaWithT, cmd *exec.Cmd, orderer, tempDir, genesisBlockPath, fabricRootDir string) *gexec.Session {
@@ -78,6 +79,10 @@ func launchOrderer(gt *GomegaWithT, cmd *exec.Cmd, orderer, tempDir, genesisBloc
7879
"ORDERER_OPERATIONS_TLS_ENABLED=false",
7980
fmt.Sprintf("ORDERER_FILELEDGER_LOCATION=%s", filepath.Join(tempDir, "ledger")),
8081
fmt.Sprintf("ORDERER_GENERAL_GENESISFILE=%s", genesisBlockPath),
82+
"ORDERER_GENERAL_CLUSTER_LISTENPORT=5612",
83+
"ORDERER_GENERAL_CLUSTER_LISTENADDRESS=127.0.0.1",
84+
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_SERVERCERTIFICATE=%s", filepath.Join(cwd, "testdata", "tls", "server.crt")),
85+
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_SERVERPRIVATEKEY=%s", filepath.Join(cwd, "testdata", "tls", "server.key")),
8186
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_CLIENTCERTIFICATE=%s", filepath.Join(cwd, "testdata", "tls", "server.crt")),
8287
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_CLIENTPRIVATEKEY=%s", filepath.Join(cwd, "testdata", "tls", "server.key")),
8388
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_ROOTCAS=[%s]", filepath.Join(cwd, "testdata", "tls", "ca.crt")),

orderer/common/server/main.go

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import (
5050
"github.com/hyperledger/fabric/protos/utils"
5151
"go.uber.org/zap/zapcore"
5252
"google.golang.org/grpc"
53-
"gopkg.in/alecthomas/kingpin.v2"
53+
kingpin "gopkg.in/alecthomas/kingpin.v2"
5454
)
5555

5656
var logger = flogging.MustGetLogger("orderer.common.server")
@@ -97,14 +97,14 @@ func Start(cmd string, conf *localconfig.TopLevel) {
9797
lf, _ := createLedgerFactory(conf)
9898

9999
clusterDialer := &cluster.PredicateDialer{}
100-
clusterConfig := initializeClusterConfig(conf)
101-
clusterDialer.SetConfig(clusterConfig)
100+
clusterClientConfig := initializeClusterClientConfig(conf)
101+
clusterDialer.SetConfig(clusterClientConfig)
102102

103103
// Only clusters that are equipped with a recent config block can replicate.
104104
if clusterType && conf.General.GenesisMethod == "file" {
105105
r := &replicationInitiator{
106106
logger: logger,
107-
secOpts: clusterConfig.SecOpts,
107+
secOpts: clusterClientConfig.SecOpts,
108108
bootstrapBlock: bootstrapBlock,
109109
conf: conf,
110110
lf: &ledgerFactory{lf},
@@ -129,25 +129,49 @@ func Start(cmd string, conf *localconfig.TopLevel) {
129129
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
130130
}
131131

132+
clusterServerConfig := serverConfig
133+
clusterGRPCServer := grpcServer
134+
if clusterType {
135+
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, grpcServer, ioutil.ReadFile)
136+
}
137+
138+
var servers = []*comm.GRPCServer{grpcServer}
139+
// If we have a separate gRPC server for the cluster, we need to update its TLS
140+
// CA certificate pool too.
141+
if clusterGRPCServer != grpcServer {
142+
servers = append(servers, clusterGRPCServer)
143+
}
144+
132145
tlsCallback := func(bundle *channelconfig.Bundle) {
133146
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
134147
if grpcServer.MutualTLSRequired() || clusterType {
135148
logger.Debug("Executing callback to update root CAs")
136-
updateTrustedRoots(grpcServer, caSupport, bundle)
149+
updateTrustedRoots(caSupport, bundle, servers...)
137150
if clusterType {
138-
updateClusterDialer(caSupport, clusterDialer, clusterConfig.SecOpts.ServerRootCAs)
151+
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
139152
}
140153
}
141154
}
142155

143-
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, lf, tlsCallback)
156+
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, lf, tlsCallback)
144157
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
145158
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
146159

147160
logger.Infof("Starting %s", metadata.GetVersionInfo())
148161
go handleSignals(addPlatformSignals(map[os.Signal]func(){
149-
syscall.SIGTERM: func() { grpcServer.Stop() },
162+
syscall.SIGTERM: func() {
163+
grpcServer.Stop()
164+
if clusterGRPCServer != grpcServer {
165+
clusterGRPCServer.Stop()
166+
}
167+
},
150168
}))
169+
170+
if clusterGRPCServer != grpcServer {
171+
logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
172+
go clusterGRPCServer.Start()
173+
}
174+
151175
initializeProfilingService(conf)
152176
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
153177
logger.Info("Beginning to serve requests")
@@ -190,7 +214,75 @@ func handleSignals(handlers map[os.Signal]func()) {
190214
}
191215
}
192216

193-
func initializeClusterConfig(conf *localconfig.TopLevel) comm.ClientConfig {
217+
type loadPEMFunc func(string) ([]byte, error)
218+
219+
// configureClusterListener gets a ServerConfig and a GRPCServer, and:
220+
// 1) If the TopLevel configuration states that the cluster configuration for the cluster gRPC service is missing, returns them back.
221+
// 2) Else, returns a new ServerConfig and a new gRPC server (with its own TLS listener on a different port).
222+
func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.ServerConfig, generalSrv *comm.GRPCServer, loadPEM loadPEMFunc) (comm.ServerConfig, *comm.GRPCServer) {
223+
clusterConf := conf.General.Cluster
224+
// If listen address is not configured, or the TLS certificate isn't configured,
225+
// it means we use the general listener of the node.
226+
if clusterConf.ListenPort == 0 && clusterConf.ServerCertificate == "" && clusterConf.ListenAddress == "" && clusterConf.ServerPrivateKey == "" {
227+
logger.Info("Cluster listener is not configured, defaulting to use the general listener on port", conf.General.ListenPort)
228+
return generalConf, generalSrv
229+
}
230+
231+
// Else, one of the above is defined, so all 4 properties should be defined.
232+
if clusterConf.ListenPort == 0 || clusterConf.ServerCertificate == "" || clusterConf.ListenAddress == "" || clusterConf.ServerPrivateKey == "" {
233+
logger.Panic("Options: General.Cluster.ListenPort, General.Cluster.ListenAddress, General.Cluster.ServerCertificate," +
234+
" General.Cluster.ServerPrivateKey, should be defined altogether.")
235+
}
236+
237+
cert, err := loadPEM(clusterConf.ServerCertificate)
238+
if err != nil {
239+
logger.Panicf("Failed to load cluster server certificate from '%s' (%s)", clusterConf.ServerCertificate, err)
240+
}
241+
242+
key, err := loadPEM(clusterConf.ServerPrivateKey)
243+
if err != nil {
244+
logger.Panicf("Failed to load cluster server key from '%s' (%s)", clusterConf.ServerPrivateKey, err)
245+
}
246+
247+
port := fmt.Sprintf("%d", clusterConf.ListenPort)
248+
bindAddr := net.JoinHostPort(clusterConf.ListenAddress, port)
249+
250+
var clientRootCAs [][]byte
251+
for _, serverRoot := range conf.General.Cluster.RootCAs {
252+
rootCACert, err := loadPEM(serverRoot)
253+
if err != nil {
254+
logger.Panicf("Failed to load CA cert file '%s' (%s)",
255+
err, serverRoot)
256+
}
257+
clientRootCAs = append(clientRootCAs, rootCACert)
258+
}
259+
260+
serverConf := comm.ServerConfig{
261+
StreamInterceptors: generalConf.StreamInterceptors,
262+
UnaryInterceptors: generalConf.UnaryInterceptors,
263+
ConnectionTimeout: generalConf.ConnectionTimeout,
264+
MetricsProvider: generalConf.MetricsProvider,
265+
Logger: generalConf.Logger,
266+
KaOpts: generalConf.KaOpts,
267+
SecOpts: &comm.SecureOptions{
268+
CipherSuites: comm.DefaultTLSCipherSuites,
269+
ClientRootCAs: clientRootCAs,
270+
RequireClientCert: true,
271+
Certificate: cert,
272+
UseTLS: true,
273+
Key: key,
274+
},
275+
}
276+
277+
srv, err := comm.NewGRPCServer(bindAddr, serverConf)
278+
if err != nil {
279+
logger.Panicf("Failed creating gRPC server on %s:%d due to %v", clusterConf.ListenAddress, clusterConf.ListenPort, err)
280+
}
281+
282+
return serverConf, srv
283+
}
284+
285+
func initializeClusterClientConfig(conf *localconfig.TopLevel) comm.ClientConfig {
194286
cc := comm.ClientConfig{
195287
AsyncConnect: true,
196288
KaOpts: comm.DefaultKeepaliveOptions,
@@ -455,8 +547,7 @@ func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics
455547
})
456548
}
457549

458-
func updateTrustedRoots(srv *comm.GRPCServer, rootCASupport *comm.CASupport,
459-
cm channelconfig.Resources) {
550+
func updateTrustedRoots(rootCASupport *comm.CASupport, cm channelconfig.Resources, servers ...*comm.GRPCServer) {
460551
rootCASupport.Lock()
461552
defer rootCASupport.Unlock()
462553

@@ -541,12 +632,14 @@ func updateTrustedRoots(srv *comm.GRPCServer, rootCASupport *comm.CASupport,
541632
}
542633

543634
// now update the client roots for the gRPC server
544-
err = srv.SetClientRootCAs(trustedRoots)
545-
if err != nil {
546-
msg := "Failed to update trusted roots for orderer from latest config " +
547-
"block. This orderer may not be able to communicate " +
548-
"with members of channel %s (%s)"
549-
logger.Warningf(msg, cm.ConfigtxValidator().ChainID(), err)
635+
for _, srv := range servers {
636+
err = srv.SetClientRootCAs(trustedRoots)
637+
if err != nil {
638+
msg := "Failed to update trusted roots for orderer from latest config " +
639+
"block. This orderer may not be able to communicate " +
640+
"with members of channel %s (%s)"
641+
logger.Warningf(msg, cm.ConfigtxValidator().ChainID(), err)
642+
}
550643
}
551644
}
552645

0 commit comments

Comments
 (0)