From 889a01828a3892261db7f84201173b351845a0ea Mon Sep 17 00:00:00 2001 From: Michal Budzyn Date: Sun, 21 Oct 2018 21:15:21 +0200 Subject: [PATCH] Local auth localTokenAuthenticator --- README.md | 1 + cmd/kafka-proxy/server.go | 106 +++++++++++++++++++++++++------------- config/config.go | 4 ++ proxy/client.go | 24 +++++---- proxy/sasl_local.go | 22 +++++--- proxy/sasl_local_auth.go | 18 +++++-- 6 files changed, 117 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index fd886346..f97a09a4 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ See: --auth-local-command string Path to authentication plugin binary --auth-local-enable Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers --auth-local-log-level string Log level of the auth plugin (default "trace") + --auth-local-mechanism string SASL mechanism used for local authentication: PLAIN or OAUTHBEARER (default "PLAIN") --auth-local-param stringArray Authentication plugin parameter --auth-local-timeout duration Authentication timeout (default 10s) --bootstrap-server-mapping stringArray Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport)) diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 795b1354..400ebaee 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -21,9 +21,9 @@ import ( "errors" "github.com/grepplabs/kafka-proxy/pkg/apis" - gatewayclient "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared" - gatewayserver "github.com/grepplabs/kafka-proxy/plugin/token-info/shared" localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" + tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared" + tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "strings" @@ -101,6 +101,7 @@ func initFlags() { // local authentication plugin Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers") Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary") + Server.Flags().StringVar(&c.Auth.Local.Mechanism, "auth-local-mechanism", "PLAIN", "SASL mechanism used for local authentication: PLAIN or OAUTHBEARER") Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter") Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin") Server.Flags().DurationVar(&c.Auth.Local.Timeout, "auth-local-timeout", 10*time.Second, "Authentication timeout") @@ -172,47 +173,80 @@ func initFlags() { func Run(_ *cobra.Command, _ []string) { logrus.Infof("Starting kafka-proxy version %s", config.Version) - var passwordAuthenticator apis.PasswordAuthenticator + var localPasswordAuthenticator apis.PasswordAuthenticator + var localTokenAuthenticator apis.TokenInfo if c.Auth.Local.Enable { - var err error - factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory) - if ok { - logrus.Infof("Using built-in PasswordAuthenticator") - passwordAuthenticator, err = factory.New(c.Auth.Local.Parameters) - if err != nil { - logrus.Fatal(err) + switch c.Auth.Local.Mechanism { + case "PLAIN": + var err error + factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory) + if ok { + logrus.Infof("Using built-in '%s' PasswordAuthenticator for local PasswordAuthenticator", c.Auth.Local.Command) + localPasswordAuthenticator, err = factory.New(c.Auth.Local.Parameters) + if err != nil { + logrus.Fatal(err) + } + } else { + client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters) + defer client.Kill() + + rpcClient, err := client.Client() + if err != nil { + logrus.Fatal(err) + } + raw, err := rpcClient.Dispense("passwordAuthenticator") + if err != nil { + logrus.Fatal(err) + } + localPasswordAuthenticator, ok = raw.(apis.PasswordAuthenticator) + if !ok { + logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type")) + } } - } else { - client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters) - defer client.Kill() - - rpcClient, err := client.Client() - if err != nil { - logrus.Fatal(err) - } - raw, err := rpcClient.Dispense("passwordAuthenticator") - if err != nil { - logrus.Fatal(err) - } - passwordAuthenticator, ok = raw.(apis.PasswordAuthenticator) - if !ok { - logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type")) + case "OAUTHBEARER": + var err error + factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Local.Command).(apis.TokenInfoFactory) + if ok { + logrus.Infof("Using built-in '%s' TokenInfo for local TokenAuthenticator", c.Auth.Local.Command) + + localTokenAuthenticator, err = factory.New(c.Auth.Local.Parameters) + if err != nil { + logrus.Fatal(err) + } + } else { + client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters) + defer client.Kill() + + rpcClient, err := client.Client() + if err != nil { + logrus.Fatal(err) + } + raw, err := rpcClient.Dispense("tokenInfo") + if err != nil { + logrus.Fatal(err) + } + localTokenAuthenticator, ok = raw.(apis.TokenInfo) + if !ok { + logrus.Fatal(errors.New("unsupported TokenInfo plugin type")) + } } + default: + logrus.Fatal(errors.New("unsupported local auth mechanism")) } } - var tokenProvider apis.TokenProvider + var gatewayTokenProvider apis.TokenProvider if c.Auth.Gateway.Client.Enable { var err error factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Auth.Gateway.Client.Command).(apis.TokenProviderFactory) if ok { - logrus.Infof("Using built-in TokenProvider") - tokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters) + logrus.Infof("Using built-in '%s' TokenProvider for Gateway Client", c.Auth.Gateway.Client.Command) + gatewayTokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters) if err != nil { logrus.Fatal(err) } } else { - client := NewPluginClient(gatewayclient.Handshake, gatewayclient.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters) + client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters) defer client.Kill() rpcClient, err := client.Client() @@ -223,26 +257,26 @@ func Run(_ *cobra.Command, _ []string) { if err != nil { logrus.Fatal(err) } - tokenProvider, ok = raw.(apis.TokenProvider) + gatewayTokenProvider, ok = raw.(apis.TokenProvider) if !ok { logrus.Fatal(errors.New("unsupported TokenProvider plugin type")) } } } - var tokenInfo apis.TokenInfo + var gatewayTokenInfo apis.TokenInfo if c.Auth.Gateway.Server.Enable { var err error factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Gateway.Server.Command).(apis.TokenInfoFactory) if ok { - logrus.Infof("Using built-in TokenInfo") + logrus.Infof("Using built-in '%s' TokenInfo for Gateway Server", c.Auth.Gateway.Server.Command) - tokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters) + gatewayTokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters) if err != nil { logrus.Fatal(err) } } else { - client := NewPluginClient(gatewayserver.Handshake, gatewayserver.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters) + client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters) defer client.Kill() rpcClient, err := client.Client() @@ -253,7 +287,7 @@ func Run(_ *cobra.Command, _ []string) { if err != nil { logrus.Fatal(err) } - tokenInfo, ok = raw.(apis.TokenInfo) + gatewayTokenInfo, ok = raw.(apis.TokenInfo) if !ok { logrus.Fatal(errors.New("unsupported TokenInfo plugin type")) } @@ -273,7 +307,7 @@ func Run(_ *cobra.Command, _ []string) { if err != nil { logrus.Fatal(err) } - proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, passwordAuthenticator, tokenProvider, tokenInfo) + proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, gatewayTokenProvider, gatewayTokenInfo) if err != nil { logrus.Fatal(err) } diff --git a/config/config.go b/config/config.go index 51cb1b47..55a117e8 100644 --- a/config/config.go +++ b/config/config.go @@ -66,6 +66,7 @@ type Config struct { Local struct { Enable bool Command string + Mechanism string Parameters []string LogLevel string Timeout time.Duration @@ -254,6 +255,9 @@ func (c *Config) Validate() error { if c.Auth.Local.Enable && c.Auth.Local.Command == "" { return errors.New("Command is required when Auth.Local.Enable is enabled") } + if c.Auth.Local.Enable && (c.Auth.Local.Mechanism != "PLAIN" && c.Auth.Local.Mechanism != "OAUTHBEARER") { + return errors.New("Mechanism PLAIN or OAUTHBEARER is required when Auth.Local.Enable is enabled") + } if c.Auth.Local.Enable && c.Auth.Local.Timeout <= 0 { return errors.New("Auth.Local.Timeout must be greater than 0") } diff --git a/proxy/client.go b/proxy/client.go index 312d7689..2c3b1d9a 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -38,7 +38,7 @@ type Client struct { authClient *AuthClient } -func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, passwordAuthenticator apis.PasswordAuthenticator, tokenProvider apis.TokenProvider, tokenInfo apis.TokenInfo) (*Client, error) { +func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) { tlsConfig, err := newTLSClientConfig(c) if err != nil { return nil, err @@ -60,14 +60,14 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne forbiddenApiKeys[int16(apiKey)] = struct{}{} } } - if c.Auth.Local.Enable && passwordAuthenticator == nil { - return nil, errors.New("Auth.Local.Enable is enabled but passwordAuthenticator is nil") + if c.Auth.Local.Enable && (localPasswordAuthenticator == nil && localTokenAuthenticator == nil) { + return nil, errors.New("Auth.Local.Enable is enabled but passwordAuthenticator and localTokenAuthenticator are nil") } - if c.Auth.Gateway.Client.Enable && tokenProvider == nil { + if c.Auth.Gateway.Client.Enable && gatewayTokenProvider == nil { return nil, errors.New("Auth.Gateway.Client.Enable is enabled but tokenProvider is nil") } - if c.Auth.Gateway.Server.Enable && tokenInfo == nil { + if c.Auth.Gateway.Server.Enable && gatewayTokenInfo == nil { return nil, errors.New("Auth.Gateway.Server.Enable is enabled but tokenInfo is nil") } @@ -84,7 +84,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne magic: c.Auth.Gateway.Client.Magic, method: c.Auth.Gateway.Client.Method, timeout: c.Auth.Gateway.Client.Timeout, - tokenProvider: tokenProvider, + tokenProvider: gatewayTokenProvider, }, processorConfig: ProcessorConfig{ MaxOpenRequests: c.Kafka.MaxOpenRequests, @@ -93,16 +93,18 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne ResponseBufferSize: c.Proxy.ResponseBufferSize, ReadTimeout: c.Kafka.ReadTimeout, WriteTimeout: c.Kafka.WriteTimeout, - LocalSasl: NewLocalSasl( - c.Auth.Local.Enable, - c.Auth.Local.Timeout, - passwordAuthenticator), + LocalSasl: NewLocalSasl(LocalSaslParams{ + enabled: c.Auth.Local.Enable, + timeout: c.Auth.Local.Timeout, + passwordAuthenticator: localPasswordAuthenticator, + tokenAuthenticator: localTokenAuthenticator, + }), AuthServer: &AuthServer{ enabled: c.Auth.Gateway.Server.Enable, magic: c.Auth.Gateway.Server.Magic, method: c.Auth.Gateway.Server.Method, timeout: c.Auth.Gateway.Server.Timeout, - tokenInfo: tokenInfo, + tokenInfo: gatewayTokenInfo, }, ForbiddenApiKeys: forbiddenApiKeys, }}, nil diff --git a/proxy/sasl_local.go b/proxy/sasl_local.go index e0300ae7..6387972c 100644 --- a/proxy/sasl_local.go +++ b/proxy/sasl_local.go @@ -17,15 +17,25 @@ type LocalSasl struct { localAuthenticators map[string]LocalSaslAuth } -func NewLocalSasl(enabled bool, timeout time.Duration, passwordAuthenticator apis.PasswordAuthenticator) *LocalSasl { +type LocalSaslParams struct { + enabled bool + timeout time.Duration + passwordAuthenticator apis.PasswordAuthenticator + tokenAuthenticator apis.TokenInfo +} + +func NewLocalSasl(params LocalSaslParams) *LocalSasl { localAuthenticators := make(map[string]LocalSaslAuth) - if passwordAuthenticator != nil { - localAuthenticators[SASLPlain] = NewLocalSaslPlain(passwordAuthenticator) + if params.passwordAuthenticator != nil { + localAuthenticators[SASLPlain] = NewLocalSaslPlain(params.passwordAuthenticator) + } + + if params.tokenAuthenticator != nil { + localAuthenticators[SASLOAuthBearer] = NewLocalSaslOauth(params.tokenAuthenticator) } - localAuthenticators[SASLOAuthBearer] = NewLocalSaslOauth() return &LocalSasl{ - enabled: enabled, - timeout: timeout, + enabled: params.enabled, + timeout: params.timeout, localAuthenticators: localAuthenticators, } } diff --git a/proxy/sasl_local_auth.go b/proxy/sasl_local_auth.go index c31ae531..e4fa9efb 100644 --- a/proxy/sasl_local_auth.go +++ b/proxy/sasl_local_auth.go @@ -1,6 +1,7 @@ package proxy import ( + "context" "fmt" "github.com/grepplabs/kafka-proxy/pkg/apis" "github.com/grepplabs/kafka-proxy/proxy/protocol" @@ -47,12 +48,14 @@ func (p *LocalSaslPlain) doLocalAuth(saslAuthBytes []byte) (err error) { } type LocalSaslOauth struct { - saslOAuthBearer SaslOAuthBearer + saslOAuthBearer SaslOAuthBearer + tokenAuthenticator apis.TokenInfo } -func NewLocalSaslOauth() *LocalSaslOauth { +func NewLocalSaslOauth(tokenAuthenticator apis.TokenInfo) *LocalSaslOauth { return &LocalSaslOauth{ - saslOAuthBearer: SaslOAuthBearer{}, + saslOAuthBearer: SaslOAuthBearer{}, + tokenAuthenticator: tokenAuthenticator, } } @@ -62,7 +65,12 @@ func (p *LocalSaslOauth) doLocalAuth(saslAuthBytes []byte) (err error) { if err != nil { return err } - //TODO: implement TokenAuthenticator - _ = token + resp, err := p.tokenAuthenticator.VerifyToken(context.Background(), apis.VerifyRequest{Token: token}) + if err != nil { + return err + } + if !resp.Success { + return fmt.Errorf("verify token failed with status: %d", resp.Status) + } return nil }