From 33609c6244cc6adf1babb3aa76a7219a95dcf17d Mon Sep 17 00:00:00 2001 From: Maciej Gusiew Date: Thu, 30 Apr 2020 17:13:33 +0200 Subject: [PATCH] Implement same client cert check feature --- .gitignore | 4 ++ Gopkg.lock | 43 ++++++++++++++++ README.md | 17 ++++++ cmd/kafka-proxy/server.go | 3 ++ cmd/kafka-proxy/server_test.go | 90 ++++++++++++++++++++++++++++++++ config/config.go | 16 +++--- proxy/client.go | 24 +++++++++ proxy/tls.go | 80 +++++++++++++++++++++++++++++ proxy/tls_test.go | 94 +++++++++++++++++++++++++++++++--- proxy/util_test.go | 33 +++++++++--- 10 files changed, 382 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 9efb7c20..1642ac4b 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,7 @@ Session.vim *~ # Auto-generated tag files tags + +#IntelliJ +kafka-proxy.iml +vendor/ diff --git a/Gopkg.lock b/Gopkg.lock index 6b8782ad..f8ecc7b2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -74,6 +74,32 @@ revision = "925541529c1fa6821df4e44ce2723319eb2be768" version = "v1.0.0" +[[projects]] + digest = "1:a5200282cd07d920ad0499556b1e864401b6df768e90d38068cfbb313a0b763f" + name = "github.com/grepplabs/kafka-proxy" + packages = [ + "cmd/kafka-proxy", + "cmd/tools", + "config", + "pkg/apis", + "pkg/libs/googleid", + "pkg/libs/googleid-info", + "pkg/libs/googleid-provider", + "pkg/libs/util", + "pkg/registry", + "plugin/local-auth/proto", + "plugin/local-auth/shared", + "plugin/token-info/proto", + "plugin/token-info/shared", + "plugin/token-provider/proto", + "plugin/token-provider/shared", + "proxy", + "proxy/protocol", + ] + pruneopts = "UT" + revision = "1072f207f43ebaaef8bfb9b62a983934efff2fa0" + version = "v0.2.0" + [[projects]] branch = "master" digest = "1:07671f8997086ed115824d1974507d2b147d1e0463675ea5dbf3be89b1c2c563" @@ -533,6 +559,23 @@ "github.com/elazarl/goproxy/ext/auth", "github.com/fsnotify/fsnotify", "github.com/golang/protobuf/proto", + "github.com/grepplabs/kafka-proxy/cmd/kafka-proxy", + "github.com/grepplabs/kafka-proxy/cmd/tools", + "github.com/grepplabs/kafka-proxy/config", + "github.com/grepplabs/kafka-proxy/pkg/apis", + "github.com/grepplabs/kafka-proxy/pkg/libs/googleid", + "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-info", + "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-provider", + "github.com/grepplabs/kafka-proxy/pkg/libs/util", + "github.com/grepplabs/kafka-proxy/pkg/registry", + "github.com/grepplabs/kafka-proxy/plugin/local-auth/proto", + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared", + "github.com/grepplabs/kafka-proxy/plugin/token-info/proto", + "github.com/grepplabs/kafka-proxy/plugin/token-info/shared", + "github.com/grepplabs/kafka-proxy/plugin/token-provider/proto", + "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared", + "github.com/grepplabs/kafka-proxy/proxy", + "github.com/grepplabs/kafka-proxy/proxy/protocol", "github.com/hashicorp/go-hclog", "github.com/hashicorp/go-multierror", "github.com/hashicorp/go-plugin", diff --git a/README.md b/README.md index 7bd67df1..2a1d4520 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ See: --tls-client-key-password string Password to decrypt rsa private key --tls-enable Whether or not to use TLS when connecting to the broker --tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name + --same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file) ### Usage example @@ -209,6 +210,22 @@ SASL authentication is performed by the proxy. SASL authentication is enabled on --auth-local-param "--claim-sub=alice" \ --auth-local-param "--claim-sub=bob" \ --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" + +### Same client certificate check enabled example + +Validate that client certificate used by proxy client is exactly the same as client certificate in authentication initiated by proxy + + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \ + --tls-enable \ + --tls-client-cert-file client.crt \ + --tls-client-key-file client.pem \ + --tls-client-key-password changeit \ + --proxy-listener-tls-enable \ + --proxy-listener-key-file server.pem \ + --proxy-listener-cert-file server.crt \ + --proxy-listener-key-password changeit \ + --proxy-listener-ca-chain-cert-file ca.crt \ + --same-client-cert-enable ### Kafka Gateway example diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 17565675..7bb04fa9 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -148,6 +148,9 @@ func initFlags() { Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key") Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file") + //Same TLS client cert + Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)") + // SASL by Proxy Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL") Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name") diff --git a/cmd/kafka-proxy/server_test.go b/cmd/kafka-proxy/server_test.go index 09bde703..74f3d940 100644 --- a/cmd/kafka-proxy/server_test.go +++ b/cmd/kafka-proxy/server_test.go @@ -147,3 +147,93 @@ func TestExternalServersMappingFromEnv(t *testing.T) { a.Equal(c.Proxy.ExternalServers[1].AdvertisedAddress, "kafka-5.grepplabs.com:9092") } + +func TestSameClientCertEnabledWithRequiredFlags(t *testing.T) { + + setupBootstrapServersMappingTest() + + args := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + //same client enabled attributes + "--same-client-cert-enable", "", + "--proxy-listener-tls-enable", "", + "--tls-enable", "", + "--tls-client-cert-file", "client.crt", + //other necessary tls arguments + "--proxy-listener-key-file", "server.pem", + "--proxy-listener-cert-file", "server.crt", + } + + _ = Server.ParseFlags(args) + err := Server.PreRunE(nil, args) + a := assert.New(t) + + a.Nil(err) +} + +func TestSameClientCertEnabledWithMissingFlags(t *testing.T) { + + expectedErrorMsg := "SameClientCertEnable requires TLS to be enabled on both proxy and kafka connections and client cert file on kafka connection" + + disabledProxyTLS := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + //same client enabled attributes + "--same-client-cert-enable", "", + "--tls-enable", "", + "--tls-client-cert-file", "client.crt", + //other necessary tls arguments + "--proxy-listener-key-file", "server.pem", + "--proxy-listener-cert-file", "server.crt", + } + + disabledTLS := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + //same client enabled attributes + "--same-client-cert-enable", "", + "--proxy-listener-tls-enable", "", + //other necessary tls arguments + "--proxy-listener-key-file", "server.pem", + "--proxy-listener-cert-file", "server.crt", + } + + missingTLSClientCert := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + //same client enabled attributes + "--same-client-cert-enable", "", + "--proxy-listener-tls-enable", "", + "--tls-enable", "", + //other necessary tls arguments + "--proxy-listener-key-file", "server.pem", + "--proxy-listener-cert-file", "server.crt", + } + + t.Run("DisabledProxyTLS", func(t *testing.T) { + serverPreRunFailure(t, disabledProxyTLS, expectedErrorMsg) + }) + + t.Run("DisabledTLS", func(t *testing.T) { + serverPreRunFailure(t, disabledTLS, expectedErrorMsg) + }) + + t.Run("MissingTLSClientCert", func(t *testing.T) { + serverPreRunFailure(t, missingTLSClientCert, expectedErrorMsg) + }) +} + +func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) { + setupBootstrapServersMappingTest() + + _ = Server.ParseFlags(cmdLineFlags) + err := Server.PreRunE(nil, cmdLineFlags) + a := assert.New(t) + + a.Equal(err.Error(), expectedErrorMsg) +} diff --git a/config/config.go b/config/config.go index 7f5dfc17..6736504b 100644 --- a/config/config.go +++ b/config/config.go @@ -113,12 +113,13 @@ type Config struct { ConnectionWriteBufferSize int // SO_SNDBUF TLS struct { - Enable bool - InsecureSkipVerify bool - ClientCertFile string - ClientKeyFile string - ClientKeyPassword string - CAChainCertFile string + Enable bool + InsecureSkipVerify bool + ClientCertFile string + ClientKeyFile string + ClientKeyPassword string + CAChainCertFile string + SameClientCertEnable bool } SASL struct { @@ -315,6 +316,9 @@ func (c *Config) Validate() error { if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") { return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled") } + if c.Kafka.TLS.SameClientCertEnable && (!c.Kafka.TLS.Enable || c.Kafka.TLS.ClientCertFile == "" || !c.Proxy.TLS.Enable) { + return errors.New("ClientCertFile is required on Kafka TLS and TLS must be enabled on both Proxy and Kafka connections when SameClientCertEnable is enabled") + } if c.Auth.Local.Enable && c.Auth.Local.Command == "" { return errors.New("Command is required when Auth.Local.Enable is enabled") } diff --git a/proxy/client.go b/proxy/client.go index 1a42076c..a7196205 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -2,6 +2,7 @@ package proxy import ( "crypto/tls" + "crypto/x509" "fmt" "github.com/grepplabs/kafka-proxy/config" "github.com/grepplabs/kafka-proxy/pkg/apis" @@ -39,6 +40,8 @@ type Client struct { authClient *AuthClient dialAddressMapping map[string]config.DialAddressMapping + + kafkaClientCert *x509.Certificate } func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) { @@ -46,6 +49,15 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne if err != nil { return nil, err } + + var kafkaClientCert *x509.Certificate = nil + if c.Kafka.TLS.SameClientCertEnable { + kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile) + if err != nil { + return nil, err + } + } + dialer, err := newDialer(c, tlsConfig) if err != nil { return nil, err @@ -145,6 +157,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne ForbiddenApiKeys: forbiddenApiKeys, }, dialAddressMapping: dialAddressMapping, + kafkaClientCert: kafkaClientCert, }, nil } @@ -242,6 +255,17 @@ func (c *Client) Close() { } func (c *Client) handleConn(conn Conn) { + localConn := conn.LocalConnection + if c.kafkaClientCert != nil { + err := handshakeAsTLSAndValidateClientCert(localConn, c.kafkaClientCert, c.config.Kafka.DialTimeout) + + if err != nil { + logrus.Info(err.Error()) + _ = localConn.Close() + return + } + } + proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc() dialAddress := conn.BrokerAddress diff --git a/proxy/tls.go b/proxy/tls.go index b6f188a8..457ebeb4 100644 --- a/proxy/tls.go +++ b/proxy/tls.go @@ -8,7 +8,9 @@ import ( "github.com/klauspost/cpuid" "github.com/pkg/errors" "io/ioutil" + "net" "strings" + "time" ) var ( @@ -58,6 +60,8 @@ var ( "ECDHE-RSA-3DES-EDE-CBC-SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, "RSA-3DES-EDE-CBC-SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA, } + + zeroTime = time.Time{} ) func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) { @@ -225,3 +229,79 @@ func decryptPEM(pemData []byte, password string) ([]byte, error) { } return pemData, nil } + +func parseCertificate(certFile string) (*x509.Certificate, error) { + + content, readErr := ioutil.ReadFile(certFile) + + if readErr != nil { + return nil, errors.Errorf("Failed to read file from location '%s'", certFile) + } + + block, _ := pem.Decode(content) + + cert, parseErr := x509.ParseCertificate(block.Bytes) + + if parseErr != nil { + return nil, errors.Errorf("Failed to parse certificate file from location '%s'", certFile) + } + + return cert, nil +} + +func handshakeAsTLSAndValidateClientCert(conn net.Conn, expectedCert *x509.Certificate, handshakeTimeout time.Duration) error { + tlsConn, ok := conn.(*tls.Conn) + if !ok { + return errors.New("Unable to cast connection to TLS when validating client cert") + } + + err := handshakeTLSConn(tlsConn, handshakeTimeout) + if err != nil { + return err + } + + actualClientCert := filterClientCertificate(tlsConn.ConnectionState().PeerCertificates) + + result := validateClientCert(actualClientCert, expectedCert) + + return result +} + +func handshakeTLSConn(tlsConn *tls.Conn, timeout time.Duration) error { + err := tlsConn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + return errors.Errorf("Failed to set deadline with handshake timeout in seconds %f on connection: %v", timeout.Seconds(), err) + } + + err = tlsConn.Handshake() + if err != nil { + return errors.Errorf("TLS handshake failed when exchanging client certificates: %v", err) + } + + err = tlsConn.SetDeadline(zeroTime) + if err != nil { + return errors.Errorf("Failed to reset deadline on connection: %v", err) + } + + return err +} + +func filterClientCertificate(peerCertificates []*x509.Certificate) *x509.Certificate { + for _, v := range peerCertificates { + if !v.IsCA { + return v + } + } + return nil +} + +func validateClientCert(actualClientCert *x509.Certificate, expectedCert *x509.Certificate) error { + if actualClientCert == nil { + return errors.New("Client cert not found in TLS connection") + } + + if !actualClientCert.Equal(expectedCert) { + return errors.New("Client cert sent by proxy client does not match brokers client cert (tls-client-cert-file)") + } + return nil +} diff --git a/proxy/tls_test.go b/proxy/tls_test.go index a4e3e09d..23209b66 100644 --- a/proxy/tls_test.go +++ b/proxy/tls_test.go @@ -61,7 +61,7 @@ func TestTLSUnknownAuthorityNoCAChainCert(t *testing.T) { c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name() c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name() - _, _, _, err := makeTLSPipe(c) + _, _, _, err := makeTLSPipe(c, nil) a.EqualError(err, "x509: certificate signed by unknown authority") } @@ -80,7 +80,7 @@ func TestTLSUnknownAuthorityWrongCAChainCert(t *testing.T) { // different bundle -> incorrect cert c.Kafka.TLS.CAChainCertFile = bundle2.ServerCert.Name() - _, _, _, err := makeTLSPipe(c) + _, _, _, err := makeTLSPipe(c, nil) a.EqualError(err, "x509: certificate signed by unknown authority") } @@ -95,7 +95,7 @@ func TestTLSInsecureSkipVerify(t *testing.T) { c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name() c.Kafka.TLS.InsecureSkipVerify = true - c1, c2, stop, err := makeTLSPipe(c) + c1, c2, stop, err := makeTLSPipe(c, nil) if err != nil { a.FailNow(err.Error()) } @@ -114,7 +114,7 @@ func TestTLSSelfSigned(t *testing.T) { c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name() c.Kafka.TLS.CAChainCertFile = bundle.ServerCert.Name() - c1, c2, stop, err := makeTLSPipe(c) + c1, c2, stop, err := makeTLSPipe(c, nil) if err != nil { a.FailNow(err.Error()) } @@ -260,7 +260,7 @@ func TestTLSVerifyClientCertDifferentCAs(t *testing.T) { c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name() c.Kafka.TLS.ClientKeyFile = bundle2.ClientKey.Name() - c1, c2, stop, err := makeTLSPipe(c) + c1, c2, stop, err := makeTLSPipe(c, nil) if err != nil { a.FailNow(err.Error()) } @@ -283,7 +283,7 @@ func TestTLSVerifyClientCertSameCAs(t *testing.T) { c.Kafka.TLS.ClientCertFile = bundle1.ClientCert.Name() c.Kafka.TLS.ClientKeyFile = bundle1.ClientKey.Name() - c1, c2, stop, err := makeTLSPipe(c) + c1, c2, stop, err := makeTLSPipe(c, nil) if err != nil { a.FailNow(err.Error()) } @@ -304,7 +304,7 @@ func TestTLSMissingClientCert(t *testing.T) { c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name() - _, _, _, err := makeTLSPipe(c) + _, _, _, err := makeTLSPipe(c, nil) a.NotNil(err) a.Contains(err.Error(), "tls: client didn't provide a certificate") } @@ -326,12 +326,90 @@ func TestTLSBadClientCert(t *testing.T) { c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name() c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name() c.Kafka.TLS.ClientKeyFile = bundle2.ClientKey.Name() - _, _, _, err := makeTLSPipe(c) + _, _, _, err := makeTLSPipe(c, nil) a.NotNil(err) a.Contains(err.Error(), "tls: failed to verify client's certificate") } +func TestTLSVerifySameClientCert(t *testing.T) { + + sameCertToCompare := true + differentCertToCompare := false + + bundle1 := NewCertsBundle() + defer bundle1.Close() + + bundle2 := NewCertsBundle() + defer bundle2.Close() + + t.Run("SameClientCertDisabledWithSameClientCerts", func(t *testing.T) { + c, clientCertFileToCheck := configWithCertToCompare(bundle1, bundle2, sameCertToCompare) + c.Kafka.TLS.SameClientCertEnable = false + successfulPingPong(t, c, clientCertFileToCheck) + }) + + t.Run("SameClientCertDisabledWithDifferentClientCerts", func(t *testing.T) { + c, clientCertFileToCheck := configWithCertToCompare(bundle1, bundle2, differentCertToCompare) + c.Kafka.TLS.SameClientCertEnable = false + successfulPingPong(t, c, clientCertFileToCheck) + }) + + t.Run("SameClientCertEnabledWithSameClientCerts", func(t *testing.T) { + c, clientCertFileToCheck := configWithCertToCompare(bundle1, bundle2, sameCertToCompare) + c.Kafka.TLS.SameClientCertEnable = true + successfulPingPong(t, c, clientCertFileToCheck) + }) + + t.Run("SameClientCertEnabledWithDifferentClientCerts", func(t *testing.T) { + c, clientCertFileToCheck := configWithCertToCompare(bundle1, bundle2, differentCertToCompare) + c.Kafka.TLS.SameClientCertEnable = true + pipelineSetupFailure(t, c, clientCertFileToCheck, "Client cert sent by proxy client does not match brokers client cert (tls-client-cert-file)") + }) +} + +func configWithCertToCompare(bundle1 *CertsBundle, bundle2 *CertsBundle, sameCertToCompare bool) (*config.Config, string) { + c := new(config.Config) + + c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name() + c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name() + c.Proxy.TLS.CAChainCertFile = bundle2.CACert.Name() // client CA + + c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name() + c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name() + c.Kafka.TLS.ClientKeyFile = bundle2.ClientKey.Name() //client cert + + if sameCertToCompare { + return c, bundle2.ClientCert.Name() + } + + return c, bundle1.ClientCert.Name() +} + +func successfulPingPong(t *testing.T, conf *config.Config, clientCertFileToCheck string) { + a := assert.New(t) + + clientCertToCheck, _ := parseCertificate(clientCertFileToCheck) + + c1, c2, stop, err := makeTLSPipe(conf, clientCertToCheck) + if err != nil { + a.FailNow(err.Error()) + } + defer stop() + pingPong(t, c1, c2) +} + +func pipelineSetupFailure(t *testing.T, conf *config.Config, clientCertFileToCheck string, expectedErrMsg string) { + a := assert.New(t) + + expectedClientCert, _ := parseCertificate(clientCertFileToCheck) + + _, _, _, err := makeTLSPipe(conf, expectedClientCert) + + a.NotNil(err) + a.Equal(err.Error(), expectedErrMsg) +} + func pingPong(t *testing.T, c1, c2 net.Conn) { a := assert.New(t) diff --git a/proxy/util_test.go b/proxy/util_test.go index 5e2414f7..2ae015df 100644 --- a/proxy/util_test.go +++ b/proxy/util_test.go @@ -26,7 +26,7 @@ type testAcceptResult struct { err error } -func localPipe(listener net.Listener, dialer proxy.Dialer, timeout time.Duration) (net.Conn, net.Conn, error) { +func localPipe(listener net.Listener, dialer proxy.Dialer, timeout time.Duration, expectedClientCert *x509.Certificate) (net.Conn, net.Conn, error) { acceptResultChannel := make(chan testAcceptResult, 1) go func() { conn, err := listener.Accept() @@ -37,6 +37,19 @@ func localPipe(listener net.Listener, dialer proxy.Dialer, timeout time.Duration } return } + + if expectedClientCert != nil { + err = handshakeAsTLSAndValidateClientCert(conn, expectedClientCert, timeout) + + if err != nil { + acceptResultChannel <- testAcceptResult{ + conn: conn, + err: err, + } + return + } + } + // will force handshake completion buf := make([]byte, 0) _, err = conn.Read(buf) @@ -71,7 +84,7 @@ func localPipe(listener net.Listener, dialer proxy.Dialer, timeout time.Duration return c1, acceptResult.conn, acceptResult.err } -func makeTLSPipe(conf *config.Config) (net.Conn, net.Conn, func(), error) { +func makeTLSPipe(conf *config.Config, expectedClientCert *x509.Certificate) (net.Conn, net.Conn, func(), error) { stop := func() {} serverConfig, err := newTLSListenerConfig(conf) @@ -82,6 +95,10 @@ func makeTLSPipe(conf *config.Config) (net.Conn, net.Conn, func(), error) { if err != nil { return nil, nil, stop, err } + var clientCertToCheck *x509.Certificate = nil + if conf.Kafka.TLS.SameClientCertEnable { + clientCertToCheck = expectedClientCert + } tlsListener, err := tls.Listen("tcp", "127.0.0.1:0", serverConfig) if err != nil { return nil, nil, stop, err @@ -94,7 +111,7 @@ func makeTLSPipe(conf *config.Config) (net.Conn, net.Conn, func(), error) { }, config: clientConfig, } - c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second) + c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second, clientCertToCheck) stop = func() { if c1 != nil { c1.Close() @@ -168,7 +185,7 @@ func makeTLSSocks5ProxyPipe(conf *config.Config, authenticator socks5.Authentica } }() - c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second) + c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second, nil) stop = func() { if c1 != nil { c1.Close() @@ -235,7 +252,7 @@ func makeTLSHttpProxyPipe(conf *config.Config, proxyusername, proxypassword stri _ = http.Serve(proxyListener, server) }() - c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second) + c1, c2, err := localPipe(tlsListener, tlsDialer, 4*time.Second, nil) stop = func() { if c1 != nil { c1.Close() @@ -261,7 +278,7 @@ func makePipe() (net.Conn, net.Conn, func(), error) { return nil, nil, stop, err } - c1, c2, err := localPipe(listener, dialer, 4*time.Second) + c1, c2, err := localPipe(listener, dialer, 4*time.Second, nil) stop = func() { if c1 != nil { c1.Close() @@ -305,7 +322,7 @@ func makeSocks5ProxyPipe() (net.Conn, net.Conn, func(), error) { } }() - c1, c2, err := localPipe(listener, socksDialer, 4*time.Second) + c1, c2, err := localPipe(listener, socksDialer, 4*time.Second, nil) stop = func() { if c1 != nil { c1.Close() @@ -348,7 +365,7 @@ func makeHttpProxyPipe() (net.Conn, net.Conn, func(), error) { _ = http.Serve(proxyListener, server) }() - c1, c2, err := localPipe(listener, httpProxyDialer.forwardDialer, 4*time.Second) + c1, c2, err := localPipe(listener, httpProxyDialer.forwardDialer, 4*time.Second, nil) stop = func() { if c1 != nil { c1.Close()