Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement same client cert check feature #42

Merged
merged 1 commit into from
May 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ Session.vim
*~
# Auto-generated tag files
tags

#IntelliJ

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is already existing IJ section at line 10. Merge your entries with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch ! Indeed this can be merged

kafka-proxy.iml
vendor/
43 changes: 43 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ See:
--tls-client-key-password string Password to decrypt rsa private key
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
--same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the flag start with "--tls"?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • please change "matches" to "exactly matches" to avoid ambiguity.
  • reorder sentences:

Controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file). Use only when mutual TLS is enabled on proxy and broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, no problem


### Usage example

Expand Down Expand Up @@ -209,6 +210,22 @@ SASL authentication is performed by the proxy. SASL authentication is enabled on
--auth-local-param "--claim-sub=alice" \
--auth-local-param "--claim-sub=bob" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

### Same client certificate check enabled example
everesio marked this conversation as resolved.
Show resolved Hide resolved

Validate that client certificate used by proxy client is exactly the same as client certificate in authentication initiated by proxy

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
--tls-enable \
--tls-client-cert-file client.crt \
--tls-client-key-file client.pem \
--tls-client-key-password changeit \
--proxy-listener-tls-enable \
--proxy-listener-key-file server.pem \
--proxy-listener-cert-file server.crt \
--proxy-listener-key-password changeit \
--proxy-listener-ca-chain-cert-file ca.crt \
--same-client-cert-enable

### Kafka Gateway example

Expand Down
3 changes: 3 additions & 0 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func initFlags() {
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")

//Same TLS client cert
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)")

// SASL by Proxy
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")
Expand Down
90 changes: 90 additions & 0 deletions cmd/kafka-proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,93 @@ func TestExternalServersMappingFromEnv(t *testing.T) {
a.Equal(c.Proxy.ExternalServers[1].AdvertisedAddress, "kafka-5.grepplabs.com:9092")

}

func TestSameClientCertEnabledWithRequiredFlags(t *testing.T) {

setupBootstrapServersMappingTest()

args := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
//same client enabled attributes
"--same-client-cert-enable", "",
"--proxy-listener-tls-enable", "",
"--tls-enable", "",
"--tls-client-cert-file", "client.crt",
//other necessary tls arguments
"--proxy-listener-key-file", "server.pem",
"--proxy-listener-cert-file", "server.crt",
}

_ = Server.ParseFlags(args)
err := Server.PreRunE(nil, args)
a := assert.New(t)

a.Nil(err)
}

func TestSameClientCertEnabledWithMissingFlags(t *testing.T) {

expectedErrorMsg := "SameClientCertEnable requires TLS to be enabled on both proxy and kafka connections and client cert file on kafka connection"

disabledProxyTLS := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
//same client enabled attributes
"--same-client-cert-enable", "",
"--tls-enable", "",
"--tls-client-cert-file", "client.crt",
//other necessary tls arguments
"--proxy-listener-key-file", "server.pem",
"--proxy-listener-cert-file", "server.crt",
}

disabledTLS := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
//same client enabled attributes
"--same-client-cert-enable", "",
"--proxy-listener-tls-enable", "",
//other necessary tls arguments
"--proxy-listener-key-file", "server.pem",
"--proxy-listener-cert-file", "server.crt",
}

missingTLSClientCert := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
//same client enabled attributes
"--same-client-cert-enable", "",
"--proxy-listener-tls-enable", "",
"--tls-enable", "",
//other necessary tls arguments
"--proxy-listener-key-file", "server.pem",
"--proxy-listener-cert-file", "server.crt",
}

t.Run("DisabledProxyTLS", func(t *testing.T) {
serverPreRunFailure(t, disabledProxyTLS, expectedErrorMsg)
})

t.Run("DisabledTLS", func(t *testing.T) {
serverPreRunFailure(t, disabledTLS, expectedErrorMsg)
})

t.Run("MissingTLSClientCert", func(t *testing.T) {
serverPreRunFailure(t, missingTLSClientCert, expectedErrorMsg)
})
}

func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) {
setupBootstrapServersMappingTest()

_ = Server.ParseFlags(cmdLineFlags)
err := Server.PreRunE(nil, cmdLineFlags)
a := assert.New(t)

a.Equal(err.Error(), expectedErrorMsg)
}
16 changes: 10 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ type Config struct {
ConnectionWriteBufferSize int // SO_SNDBUF

TLS struct {
Enable bool
InsecureSkipVerify bool
ClientCertFile string
ClientKeyFile string
ClientKeyPassword string
CAChainCertFile string
Enable bool
InsecureSkipVerify bool
ClientCertFile string
ClientKeyFile string
ClientKeyPassword string
CAChainCertFile string
SameClientCertEnable bool
}

SASL struct {
Expand Down Expand Up @@ -315,6 +316,9 @@ func (c *Config) Validate() error {
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
}
if c.Kafka.TLS.SameClientCertEnable && (!c.Kafka.TLS.Enable || c.Kafka.TLS.ClientCertFile == "" || !c.Proxy.TLS.Enable) {
return errors.New("ClientCertFile is required on Kafka TLS and TLS must be enabled on both Proxy and Kafka connections when SameClientCertEnable is enabled")
}
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
return errors.New("Command is required when Auth.Local.Enable is enabled")
}
Expand Down
24 changes: 24 additions & 0 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/pkg/apis"
Expand Down Expand Up @@ -39,13 +40,24 @@ type Client struct {
authClient *AuthClient

dialAddressMapping map[string]config.DialAddressMapping

kafkaClientCert *x509.Certificate
}

func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
tlsConfig, err := newTLSClientConfig(c)
if err != nil {
return nil, err
}

var kafkaClientCert *x509.Certificate = nil
if c.Kafka.TLS.SameClientCertEnable {
kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile)
if err != nil {
return nil, err
}
}

dialer, err := newDialer(c, tlsConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -145,6 +157,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
ForbiddenApiKeys: forbiddenApiKeys,
},
dialAddressMapping: dialAddressMapping,
kafkaClientCert: kafkaClientCert,
}, nil
}

Expand Down Expand Up @@ -242,6 +255,17 @@ func (c *Client) Close() {
}

func (c *Client) handleConn(conn Conn) {
localConn := conn.LocalConnection
if c.kafkaClientCert != nil {
err := handshakeAsTLSAndValidateClientCert(localConn, c.kafkaClientCert, c.config.Kafka.DialTimeout)

if err != nil {
logrus.Info(err.Error())
_ = localConn.Close()
return
}
}

proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc()

dialAddress := conn.BrokerAddress
Expand Down
80 changes: 80 additions & 0 deletions proxy/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/klauspost/cpuid"
"github.com/pkg/errors"
"io/ioutil"
"net"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -58,6 +60,8 @@ var (
"ECDHE-RSA-3DES-EDE-CBC-SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
"RSA-3DES-EDE-CBC-SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
}

zeroTime = time.Time{}
)

func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
Expand Down Expand Up @@ -225,3 +229,79 @@ func decryptPEM(pemData []byte, password string) ([]byte, error) {
}
return pemData, nil
}

func parseCertificate(certFile string) (*x509.Certificate, error) {

content, readErr := ioutil.ReadFile(certFile)

if readErr != nil {
return nil, errors.Errorf("Failed to read file from location '%s'", certFile)
}

block, _ := pem.Decode(content)
mgusiew-guide marked this conversation as resolved.
Show resolved Hide resolved

cert, parseErr := x509.ParseCertificate(block.Bytes)

if parseErr != nil {
return nil, errors.Errorf("Failed to parse certificate file from location '%s'", certFile)
}

return cert, nil
}

func handshakeAsTLSAndValidateClientCert(conn net.Conn, expectedCert *x509.Certificate, handshakeTimeout time.Duration) error {
tlsConn, ok := conn.(*tls.Conn)
if !ok {
return errors.New("Unable to cast connection to TLS when validating client cert")
}

err := handshakeTLSConn(tlsConn, handshakeTimeout)
if err != nil {
return err
}

actualClientCert := filterClientCertificate(tlsConn.ConnectionState().PeerCertificates)

result := validateClientCert(actualClientCert, expectedCert)

return result
}

func handshakeTLSConn(tlsConn *tls.Conn, timeout time.Duration) error {
err := tlsConn.SetDeadline(time.Now().Add(timeout))
if err != nil {
return errors.Errorf("Failed to set deadline with handshake timeout in seconds %f on connection: %v", timeout.Seconds(), err)
}

err = tlsConn.Handshake()
if err != nil {
return errors.Errorf("TLS handshake failed when exchanging client certificates: %v", err)
}

err = tlsConn.SetDeadline(zeroTime)
if err != nil {
return errors.Errorf("Failed to reset deadline on connection: %v", err)
}

return err
}

func filterClientCertificate(peerCertificates []*x509.Certificate) *x509.Certificate {
for _, v := range peerCertificates {
if !v.IsCA {
everesio marked this conversation as resolved.
Show resolved Hide resolved
return v
}
}
return nil
}

func validateClientCert(actualClientCert *x509.Certificate, expectedCert *x509.Certificate) error {
if actualClientCert == nil {
return errors.New("Client cert not found in TLS connection")
}

if !actualClientCert.Equal(expectedCert) {
return errors.New("Client cert sent by proxy client does not match brokers client cert (tls-client-cert-file)")
}
return nil
}
Loading