From d93a8b4e6186d5771e5156b5991236835b0efae0 Mon Sep 17 00:00:00 2001 From: Michal Budzyn Date: Sun, 25 Nov 2018 14:00:04 +0100 Subject: [PATCH] Config proxy initiated oauthbearer auth. Implement unsecured-jwt-provider plugin --- Makefile | 6 +- README.md | 33 +++++++-- cmd/kafka-proxy/server.go | 49 ++++++++++++- cmd/plugin-unsecured-jwt-info/main.go | 4 +- cmd/plugin-unsecured-jwt-provider/main.go | 83 +++++++++++++++++++++++ config/config.go | 30 +++++++- proxy/client.go | 24 +++++-- proxy/sasl_by_proxy.go | 10 +++ 8 files changed, 220 insertions(+), 19 deletions(-) create mode 100644 cmd/plugin-unsecured-jwt-provider/main.go diff --git a/Makefile b/Makefile index 84fb290d..0e9fb393 100644 --- a/Makefile +++ b/Makefile @@ -67,7 +67,11 @@ plugin.google-id-info: plugin.unsecured-jwt-info: CGO_ENABLED=0 go build -o build/unsecured-jwt-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-info/main.go -all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info +plugin.unsecured-jwt-provider: + CGO_ENABLED=0 go build -o build/unsecured-jwt-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-provider/main.go + + +all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider clean: @rm -rf build diff --git a/README.md b/README.md index 72c0b3af..bd3b5cbd 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,11 @@ See: Linux - curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_linux_amd64.tar.gz | tar xz + curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_linux_amd64.tar.gz | tar xz macOS - curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_darwin_amd64.tar.gz | tar xz + curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_darwin_amd64.tar.gz | tar xz 2. Move the binary in to your PATH. @@ -85,7 +85,7 @@ See: --dynamic-listeners-disable Disable dynamic listeners. --external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started --forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics - --forward-proxy string URL of the forward proxy. Supported schemas are http and socks5 + --forward-proxy string URL of the forward proxy. Supported schemas are socks5 and http -h, --help help for server --http-disable Disable HTTP endpoints --http-health-path string Path on which to health endpoint (default "/health") @@ -113,9 +113,15 @@ See: --proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used --proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096) --proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096) - --sasl-enable Connect using SASL/PLAIN + --sasl-enable Connect using SASL --sasl-jaas-config-file string Location of JAAS config file with SASL username and password --sasl-password string SASL user password + --sasl-plugin-command string Path to authentication plugin binary + --sasl-plugin-enable Use plugin for SASL authentication + --sasl-plugin-log-level string Log level of the auth plugin (default "trace") + --sasl-plugin-mechanism string SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER (default "OAUTHBEARER") + --sasl-plugin-param stringArray Authentication plugin parameter + --sasl-plugin-timeout duration Authentication timeout (default 10s) --sasl-username string SASL user name --tls-ca-chain-cert-file string PEM encoded CA's certificate file --tls-client-cert-file string PEM encoded file with client certificate @@ -124,8 +130,6 @@ See: --tls-enable Whether or not to use TLS when connecting to the broker --tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name - - ### Usage example kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,0.0.0.0:32399" @@ -145,14 +149,29 @@ See: --external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \ --forbidden-api-keys 20 + + export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server + +### SASL authentication initiated by proxy example + +SASL authentication is initiated by the proxy. SASL authentication is disabled on the clients and enabled on the Kafka brokers. + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \ --tls-enable --tls-insecure-skip-verify \ --sasl-enable --sasl-username myuser --sasl-password mysecret - export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server + make clean build plugin.unsecured-jwt-provider && build/kafka-proxy server \ + --sasl-enable \ + --sasl-plugin-enable \ + --sasl-plugin-mechanism "OAUTHBEARER" \ + --sasl-plugin-command build/unsecured-jwt-provider \ + --sasl-plugin-param "--claim-sub=alice" \ + --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" ### Proxy authentication example +SASL authentication is performed by the proxy. SASL authentication is enabled on the clients and disabled on the Kafka brokers. + make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \ --proxy-listener-cert-file "server-cert.pem" \ --proxy-listener-ca-chain-cert-file "ca.pem" \ diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 400ebaee..f7a18dbc 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -143,12 +143,20 @@ func initFlags() { Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key") Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file") - // SASL - Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL/PLAIN") + // SASL by Proxy + Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL") Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name") Server.Flags().StringVar(&c.Kafka.SASL.Password, "sasl-password", "", "SASL user password") Server.Flags().StringVar(&c.Kafka.SASL.JaasConfigFile, "sasl-jaas-config-file", "", "Location of JAAS config file with SASL username and password") + // SASL by Proxy plugin + Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication") + Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Command, "sasl-plugin-command", "", "Path to authentication plugin binary") + Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Mechanism, "sasl-plugin-mechanism", "OAUTHBEARER", "SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER") + Server.Flags().StringArrayVar(&c.Kafka.SASL.Plugin.Parameters, "sasl-plugin-param", []string{}, "Authentication plugin parameter") + Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin") + Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout") + // Web Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints") Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on") @@ -235,6 +243,41 @@ func Run(_ *cobra.Command, _ []string) { } } + var saslTokenProvider apis.TokenProvider + if c.Kafka.SASL.Plugin.Enable { + switch c.Kafka.SASL.Plugin.Mechanism { + case "OAUTHBEARER": + var err error + factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Kafka.SASL.Plugin.Command).(apis.TokenProviderFactory) + if ok { + logrus.Infof("Using built-in '%s' TokenProvider for sasl authentication", c.Kafka.SASL.Plugin.Command) + + saslTokenProvider, err = factory.New(c.Kafka.SASL.Plugin.Parameters) + if err != nil { + logrus.Fatal(err) + } + } else { + client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Kafka.SASL.Plugin.LogLevel, c.Kafka.SASL.Plugin.Command, c.Kafka.SASL.Plugin.Parameters) + defer client.Kill() + + rpcClient, err := client.Client() + if err != nil { + logrus.Fatal(err) + } + raw, err := rpcClient.Dispense("tokenProvider") + if err != nil { + logrus.Fatal(err) + } + saslTokenProvider, ok = raw.(apis.TokenProvider) + if !ok { + logrus.Fatal(errors.New("unsupported TokenProvider plugin type")) + } + } + default: + logrus.Fatal(errors.New("unsupported sasl auth mechanism")) + } + } + var gatewayTokenProvider apis.TokenProvider if c.Auth.Gateway.Client.Enable { var err error @@ -307,7 +350,7 @@ func Run(_ *cobra.Command, _ []string) { if err != nil { logrus.Fatal(err) } - proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, gatewayTokenProvider, gatewayTokenInfo) + proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, saslTokenProvider, gatewayTokenProvider, gatewayTokenInfo) if err != nil { logrus.Fatal(err) } diff --git a/cmd/plugin-unsecured-jwt-info/main.go b/cmd/plugin-unsecured-jwt-info/main.go index c88cdc9d..adf53da1 100644 --- a/cmd/plugin-unsecured-jwt-info/main.go +++ b/cmd/plugin-unsecured-jwt-info/main.go @@ -45,6 +45,7 @@ type pluginMeta struct { func (f *pluginMeta) flagSet() *flag.FlagSet { fs := flag.NewFlagSet("unsecured-jwt-info info settings", flag.ContinueOnError) + fs.Var(&f.claimSub, "claim-sub", "Allowed subject claim (user name)") return fs } @@ -134,8 +135,7 @@ func getVerifyResponseResponse(status int) (apis.VerifyResponse, error) { func main() { pluginMeta := &pluginMeta{} fs := pluginMeta.flagSet() - fs.Var(&pluginMeta.claimSub, "claim-sub", "Allowed subject claim (user name)") - fs.Parse(os.Args[1:]) + _ = fs.Parse(os.Args[1:]) logrus.Infof("Unsecured JWT sub claims: %v", pluginMeta.claimSub) diff --git a/cmd/plugin-unsecured-jwt-provider/main.go b/cmd/plugin-unsecured-jwt-provider/main.go new file mode 100644 index 00000000..b91276e4 --- /dev/null +++ b/cmd/plugin-unsecured-jwt-provider/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "flag" + "github.com/grepplabs/kafka-proxy/pkg/apis" + "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared" + "github.com/hashicorp/go-plugin" + "github.com/sirupsen/logrus" + "golang.org/x/oauth2/jws" + "os" +) + +const ( + StatusOK = 0 + StatusEncodeError = 1 + AlgorithmNone = "none" +) + +type UnsecuredJWTProvider struct { + claimSub string +} + +func (v UnsecuredJWTProvider) GetToken(ctx context.Context, request apis.TokenRequest) (apis.TokenResponse, error) { + token, err := v.encodeToken() + if err != nil { + return getGetTokenResponse(StatusEncodeError, "") + } + + return getGetTokenResponse(StatusOK, token) +} + +func getGetTokenResponse(status int, token string) (apis.TokenResponse, error) { + success := status == StatusOK + return apis.TokenResponse{Success: success, Status: int32(status), Token: token}, nil +} + +func (v UnsecuredJWTProvider) encodeToken() (string, error) { + header := &jws.Header{ + Algorithm: AlgorithmNone, + } + claims := &jws.ClaimSet{ + Sub: v.claimSub, + } + signer := func(data []byte) (sig []byte, err error) { + return []byte{}, nil + } + return jws.EncodeWithSigner(header, claims, signer) +} + +type pluginMeta struct { + claimSub string +} + +func (f *pluginMeta) flagSet() *flag.FlagSet { + fs := flag.NewFlagSet("unsecured-jwt-info info settings", flag.ContinueOnError) + fs.StringVar(&f.claimSub, "claim-sub", "", "subject claim") + return fs +} + +func main() { + pluginMeta := &pluginMeta{} + flags := pluginMeta.flagSet() + _ = flags.Parse(os.Args[1:]) + + if pluginMeta.claimSub == "" { + logrus.Errorf("parameter claim-sub is required") + os.Exit(1) + } + + unsecuredJWTProvider := &UnsecuredJWTProvider{ + claimSub: pluginMeta.claimSub, + } + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: shared.Handshake, + Plugins: map[string]plugin.Plugin{ + "unsecuredJWTProvider": &shared.TokenProviderPlugin{Impl: unsecuredJWTProvider}, + }, + // A non-nil value here enables gRPC serving for this plugin... + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/config/config.go b/config/config.go index 55a117e8..332020f3 100644 --- a/config/config.go +++ b/config/config.go @@ -120,6 +120,14 @@ type Config struct { Username string Password string JaasConfigFile string + Plugin struct { + Enable bool + Command string + Mechanism string + Parameters []string + LogLevel string + Timeout time.Duration + } } } ForwardProxy struct { @@ -211,8 +219,26 @@ func NewConfig() *Config { } func (c *Config) Validate() error { - if c.Kafka.SASL.Enable && (c.Kafka.SASL.Username == "" || c.Kafka.SASL.Password == "") { - return errors.New("SASL.Username and SASL.Password are required when SASL is enabled") + if c.Kafka.SASL.Enable { + if c.Kafka.SASL.Plugin.Enable { + if c.Kafka.SASL.Plugin.Command == "" { + return errors.New("Command is required when Kafka.SASL.Plugin.Enable is enabled") + } + if c.Kafka.SASL.Plugin.Timeout <= 0 { + return errors.New("Kafka.SASL.Plugin.Timeout must be greater than 0") + } + if c.Kafka.SASL.Plugin.Mechanism != "OAUTHBEARER" { + return errors.New("Mechanism OAUTHBEARER is required when Kafka.SASL.Plugin.Enable is enabled") + } + } else { + if c.Kafka.SASL.Username == "" || c.Kafka.SASL.Password == "" { + return errors.New("SASL.Username and SASL.Password are required when SASL is enabled and plugin is not used") + } + } + } else { + if c.Kafka.SASL.Plugin.Enable { + return errors.New("Kafka.SASL.Plugin.Enable must be disabled, when SASL is disabled") + } } if c.Kafka.KeepAlive < 0 { return errors.New("KeepAlive must be greater or equal 0") diff --git a/proxy/client.go b/proxy/client.go index 2ccbff9d..c3594849 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -38,7 +38,7 @@ type Client struct { authClient *AuthClient } -func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) { +func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) { tlsConfig, err := newTLSClientConfig(c) if err != nil { return nil, err @@ -70,15 +70,31 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne if c.Auth.Gateway.Server.Enable && gatewayTokenInfo == nil { return nil, errors.New("Auth.Gateway.Server.Enable is enabled but tokenInfo is nil") } + var saslAuthByProxy SASLAuthByProxy + if c.Kafka.SASL.Plugin.Enable { + if c.Kafka.SASL.Plugin.Mechanism == SASLOAuthBearer && saslTokenProvider != nil { + saslAuthByProxy = &SASLOAuthBearerAuth{ + clientID: c.Kafka.ClientID, + writeTimeout: c.Kafka.WriteTimeout, + readTimeout: c.Kafka.ReadTimeout, + tokenProvider: saslTokenProvider, + } + } else { + return nil, errors.Errorf("SASLAuthByProxy plugin unsupported or plugin misconfiguration for mechanism '%s' ", c.Kafka.SASL.Plugin.Mechanism) + } - return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1), - saslAuthByProxy: &SASLPlainAuth{ + } else { + saslAuthByProxy = &SASLPlainAuth{ clientID: c.Kafka.ClientID, writeTimeout: c.Kafka.WriteTimeout, readTimeout: c.Kafka.ReadTimeout, username: c.Kafka.SASL.Username, password: c.Kafka.SASL.Password, - }, + } + } + + return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1), + saslAuthByProxy: saslAuthByProxy, authClient: &AuthClient{ enabled: c.Auth.Gateway.Client.Enable, magic: c.Auth.Gateway.Client.Magic, diff --git a/proxy/sasl_by_proxy.go b/proxy/sasl_by_proxy.go index b06cb358..8516215f 100644 --- a/proxy/sasl_by_proxy.go +++ b/proxy/sasl_by_proxy.go @@ -8,6 +8,7 @@ import ( "github.com/grepplabs/kafka-proxy/pkg/apis" "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "io" "time" ) @@ -77,6 +78,12 @@ func (b *SASLPlainAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) error if handshakeErr != nil { return handshakeErr } + return b.sendSaslAuthenticateRequest(conn) +} + +func (b *SASLPlainAuth) sendSaslAuthenticateRequest(conn DeadlineReaderWriter) error { + logrus.Debugf("Sending authentication opaque packets, mechanism PLAIN") + length := 1 + len(b.username) + 1 + len(b.password) authBytes := make([]byte, length+4) //4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length)) @@ -110,6 +117,7 @@ func (b *SASLPlainAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) error } func (b *SASLHandshake) sendAndReceiveHandshake(conn DeadlineReaderWriter) error { + logrus.Debugf("Sending SaslHandshakeRequest") req := &protocol.Request{ ClientID: b.clientID, @@ -195,6 +203,8 @@ func (b *SASLOAuthBearerAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) } func (b *SASLOAuthBearerAuth) sendSaslAuthenticateRequest(token string, conn DeadlineReaderWriter) error { + logrus.Debugf("Sending SaslAuthenticateRequest, mechanism OAUTHBEARER") + saslAuthReqV0 := protocol.SaslAuthenticateRequestV0{SaslAuthBytes: SaslOAuthBearer{}.ToBytes(token, "", make(map[string]string, 0))} req := &protocol.Request{