Skip to content

Commit

Permalink
Local auth localTokenAuthenticator
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Oct 21, 2018
1 parent 7a50ff5 commit 889a018
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 58 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -75,6 +75,7 @@ See:
--auth-local-command string Path to authentication plugin binary
--auth-local-enable Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers
--auth-local-log-level string Log level of the auth plugin (default "trace")
--auth-local-mechanism string SASL mechanism used for local authentication: PLAIN or OAUTHBEARER (default "PLAIN")
--auth-local-param stringArray Authentication plugin parameter
--auth-local-timeout duration Authentication timeout (default 10s)
--bootstrap-server-mapping stringArray Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))
Expand Down
106 changes: 70 additions & 36 deletions cmd/kafka-proxy/server.go
Expand Up @@ -21,9 +21,9 @@ import (

"errors"
"github.com/grepplabs/kafka-proxy/pkg/apis"
gatewayclient "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
gatewayserver "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"strings"
Expand Down Expand Up @@ -101,6 +101,7 @@ func initFlags() {
// local authentication plugin
Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers")
Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary")
Server.Flags().StringVar(&c.Auth.Local.Mechanism, "auth-local-mechanism", "PLAIN", "SASL mechanism used for local authentication: PLAIN or OAUTHBEARER")
Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Auth.Local.Timeout, "auth-local-timeout", 10*time.Second, "Authentication timeout")
Expand Down Expand Up @@ -172,47 +173,80 @@ func initFlags() {
func Run(_ *cobra.Command, _ []string) {
logrus.Infof("Starting kafka-proxy version %s", config.Version)

var passwordAuthenticator apis.PasswordAuthenticator
var localPasswordAuthenticator apis.PasswordAuthenticator
var localTokenAuthenticator apis.TokenInfo
if c.Auth.Local.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in PasswordAuthenticator")
passwordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
switch c.Auth.Local.Mechanism {
case "PLAIN":
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in '%s' PasswordAuthenticator for local PasswordAuthenticator", c.Auth.Local.Command)
localPasswordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
}
localPasswordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
}
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
}
passwordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
case "OAUTHBEARER":
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Local.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in '%s' TokenInfo for local TokenAuthenticator", c.Auth.Local.Command)

localTokenAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("tokenInfo")
if err != nil {
logrus.Fatal(err)
}
localTokenAuthenticator, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
}
default:
logrus.Fatal(errors.New("unsupported local auth mechanism"))
}
}

var tokenProvider apis.TokenProvider
var gatewayTokenProvider apis.TokenProvider
if c.Auth.Gateway.Client.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Auth.Gateway.Client.Command).(apis.TokenProviderFactory)
if ok {
logrus.Infof("Using built-in TokenProvider")
tokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
logrus.Infof("Using built-in '%s' TokenProvider for Gateway Client", c.Auth.Gateway.Client.Command)
gatewayTokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayclient.Handshake, gatewayclient.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -223,26 +257,26 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenProvider, ok = raw.(apis.TokenProvider)
gatewayTokenProvider, ok = raw.(apis.TokenProvider)
if !ok {
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
}
}
}

var tokenInfo apis.TokenInfo
var gatewayTokenInfo apis.TokenInfo
if c.Auth.Gateway.Server.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Gateway.Server.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in TokenInfo")
logrus.Infof("Using built-in '%s' TokenInfo for Gateway Server", c.Auth.Gateway.Server.Command)

tokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
gatewayTokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayserver.Handshake, gatewayserver.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -253,7 +287,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenInfo, ok = raw.(apis.TokenInfo)
gatewayTokenInfo, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
Expand All @@ -273,7 +307,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, passwordAuthenticator, tokenProvider, tokenInfo)
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, gatewayTokenProvider, gatewayTokenInfo)
if err != nil {
logrus.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Expand Up @@ -66,6 +66,7 @@ type Config struct {
Local struct {
Enable bool
Command string
Mechanism string
Parameters []string
LogLevel string
Timeout time.Duration
Expand Down Expand Up @@ -254,6 +255,9 @@ func (c *Config) Validate() error {
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
return errors.New("Command is required when Auth.Local.Enable is enabled")
}
if c.Auth.Local.Enable && (c.Auth.Local.Mechanism != "PLAIN" && c.Auth.Local.Mechanism != "OAUTHBEARER") {
return errors.New("Mechanism PLAIN or OAUTHBEARER is required when Auth.Local.Enable is enabled")
}
if c.Auth.Local.Enable && c.Auth.Local.Timeout <= 0 {
return errors.New("Auth.Local.Timeout must be greater than 0")
}
Expand Down
24 changes: 13 additions & 11 deletions proxy/client.go
Expand Up @@ -38,7 +38,7 @@ type Client struct {
authClient *AuthClient
}

func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, passwordAuthenticator apis.PasswordAuthenticator, tokenProvider apis.TokenProvider, tokenInfo apis.TokenInfo) (*Client, error) {
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
tlsConfig, err := newTLSClientConfig(c)
if err != nil {
return nil, err
Expand All @@ -60,14 +60,14 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
forbiddenApiKeys[int16(apiKey)] = struct{}{}
}
}
if c.Auth.Local.Enable && passwordAuthenticator == nil {
return nil, errors.New("Auth.Local.Enable is enabled but passwordAuthenticator is nil")
if c.Auth.Local.Enable && (localPasswordAuthenticator == nil && localTokenAuthenticator == nil) {
return nil, errors.New("Auth.Local.Enable is enabled but passwordAuthenticator and localTokenAuthenticator are nil")
}

if c.Auth.Gateway.Client.Enable && tokenProvider == nil {
if c.Auth.Gateway.Client.Enable && gatewayTokenProvider == nil {
return nil, errors.New("Auth.Gateway.Client.Enable is enabled but tokenProvider is nil")
}
if c.Auth.Gateway.Server.Enable && tokenInfo == nil {
if c.Auth.Gateway.Server.Enable && gatewayTokenInfo == nil {
return nil, errors.New("Auth.Gateway.Server.Enable is enabled but tokenInfo is nil")
}

Expand All @@ -84,7 +84,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
magic: c.Auth.Gateway.Client.Magic,
method: c.Auth.Gateway.Client.Method,
timeout: c.Auth.Gateway.Client.Timeout,
tokenProvider: tokenProvider,
tokenProvider: gatewayTokenProvider,
},
processorConfig: ProcessorConfig{
MaxOpenRequests: c.Kafka.MaxOpenRequests,
Expand All @@ -93,16 +93,18 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
ResponseBufferSize: c.Proxy.ResponseBufferSize,
ReadTimeout: c.Kafka.ReadTimeout,
WriteTimeout: c.Kafka.WriteTimeout,
LocalSasl: NewLocalSasl(
c.Auth.Local.Enable,
c.Auth.Local.Timeout,
passwordAuthenticator),
LocalSasl: NewLocalSasl(LocalSaslParams{
enabled: c.Auth.Local.Enable,
timeout: c.Auth.Local.Timeout,
passwordAuthenticator: localPasswordAuthenticator,
tokenAuthenticator: localTokenAuthenticator,
}),
AuthServer: &AuthServer{
enabled: c.Auth.Gateway.Server.Enable,
magic: c.Auth.Gateway.Server.Magic,
method: c.Auth.Gateway.Server.Method,
timeout: c.Auth.Gateway.Server.Timeout,
tokenInfo: tokenInfo,
tokenInfo: gatewayTokenInfo,
},
ForbiddenApiKeys: forbiddenApiKeys,
}}, nil
Expand Down
22 changes: 16 additions & 6 deletions proxy/sasl_local.go
Expand Up @@ -17,15 +17,25 @@ type LocalSasl struct {
localAuthenticators map[string]LocalSaslAuth
}

func NewLocalSasl(enabled bool, timeout time.Duration, passwordAuthenticator apis.PasswordAuthenticator) *LocalSasl {
type LocalSaslParams struct {
enabled bool
timeout time.Duration
passwordAuthenticator apis.PasswordAuthenticator
tokenAuthenticator apis.TokenInfo
}

func NewLocalSasl(params LocalSaslParams) *LocalSasl {
localAuthenticators := make(map[string]LocalSaslAuth)
if passwordAuthenticator != nil {
localAuthenticators[SASLPlain] = NewLocalSaslPlain(passwordAuthenticator)
if params.passwordAuthenticator != nil {
localAuthenticators[SASLPlain] = NewLocalSaslPlain(params.passwordAuthenticator)
}

if params.tokenAuthenticator != nil {
localAuthenticators[SASLOAuthBearer] = NewLocalSaslOauth(params.tokenAuthenticator)
}
localAuthenticators[SASLOAuthBearer] = NewLocalSaslOauth()
return &LocalSasl{
enabled: enabled,
timeout: timeout,
enabled: params.enabled,
timeout: params.timeout,
localAuthenticators: localAuthenticators,
}
}
Expand Down
18 changes: 13 additions & 5 deletions proxy/sasl_local_auth.go
@@ -1,6 +1,7 @@
package proxy

import (
"context"
"fmt"
"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/grepplabs/kafka-proxy/proxy/protocol"
Expand Down Expand Up @@ -47,12 +48,14 @@ func (p *LocalSaslPlain) doLocalAuth(saslAuthBytes []byte) (err error) {
}

type LocalSaslOauth struct {
saslOAuthBearer SaslOAuthBearer
saslOAuthBearer SaslOAuthBearer
tokenAuthenticator apis.TokenInfo
}

func NewLocalSaslOauth() *LocalSaslOauth {
func NewLocalSaslOauth(tokenAuthenticator apis.TokenInfo) *LocalSaslOauth {
return &LocalSaslOauth{
saslOAuthBearer: SaslOAuthBearer{},
saslOAuthBearer: SaslOAuthBearer{},
tokenAuthenticator: tokenAuthenticator,
}
}

Expand All @@ -62,7 +65,12 @@ func (p *LocalSaslOauth) doLocalAuth(saslAuthBytes []byte) (err error) {
if err != nil {
return err
}
//TODO: implement TokenAuthenticator
_ = token
resp, err := p.tokenAuthenticator.VerifyToken(context.Background(), apis.VerifyRequest{Token: token})
if err != nil {
return err
}
if !resp.Success {
return fmt.Errorf("verify token failed with status: %d", resp.Status)
}
return nil
}

0 comments on commit 889a018

Please sign in to comment.