Skip to content

Commit

Permalink
Merge pull request #20 from grepplabs/oauthbearer
Browse files Browse the repository at this point in the history
OAuth Authentication via SASL/OAUTHBEARER
  • Loading branch information
everesio committed Dec 1, 2018
2 parents f3f0b40 + d93a8b4 commit ac9f4c8
Show file tree
Hide file tree
Showing 32 changed files with 1,098 additions and 295 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -3,7 +3,7 @@ sudo: false
language: go

go:
- "1.10.x"
- "1.11.x"

env:
global:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.build
@@ -1,4 +1,4 @@
FROM golang:1.10 as builder
FROM golang:1.11 as builder

ARG GOOS=linux
ARG GOARCH=amd64
Expand Down
19 changes: 13 additions & 6 deletions Makefile
Expand Up @@ -8,7 +8,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.0.8"
TAG ?= "v0.1.0"
GOARCH ?= amd64
GOOS ?= linux

Expand Down Expand Up @@ -46,11 +46,11 @@ release: clean
protoc.local-auth:
protoc -I plugin/local-auth/proto/ plugin/local-auth/proto/auth.proto --go_out=plugins=grpc:plugin/local-auth/proto/

protoc.gateway-client:
protoc -I plugin/gateway-client/proto/ plugin/gateway-client/proto/token-provider.proto --go_out=plugins=grpc:plugin/gateway-client/proto/
protoc.token-provider:
protoc -I plugin/token-provider/proto/ plugin/token-provider/proto/token-provider.proto --go_out=plugins=grpc:plugin/token-provider/proto/

protoc.gateway-server:
protoc -I plugin/gateway-server/proto/ plugin/gateway-server/proto/token-info.proto --go_out=plugins=grpc:plugin/gateway-server/proto/
protoc.token-info:
protoc -I plugin/token-info/proto/ plugin/token-info/proto/token-info.proto --go_out=plugins=grpc:plugin/token-info/proto/

plugin.auth-user:
CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go
Expand All @@ -64,7 +64,14 @@ plugin.google-id-provider:
plugin.google-id-info:
CGO_ENABLED=0 go build -o build/google-id-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-googleid-info/main.go

all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info
plugin.unsecured-jwt-info:
CGO_ENABLED=0 go build -o build/unsecured-jwt-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-info/main.go

plugin.unsecured-jwt-provider:
CGO_ENABLED=0 go build -o build/unsecured-jwt-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-provider/main.go


all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider

clean:
@rm -rf build
42 changes: 35 additions & 7 deletions README.md
Expand Up @@ -34,11 +34,11 @@ See:

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_linux_amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_linux_amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_darwin_amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_darwin_amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand Down Expand Up @@ -75,6 +75,7 @@ See:
--auth-local-command string Path to authentication plugin binary
--auth-local-enable Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers
--auth-local-log-level string Log level of the auth plugin (default "trace")
--auth-local-mechanism string SASL mechanism used for local authentication: PLAIN or OAUTHBEARER (default "PLAIN")
--auth-local-param stringArray Authentication plugin parameter
--auth-local-timeout duration Authentication timeout (default 10s)
--bootstrap-server-mapping stringArray Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))
Expand All @@ -84,7 +85,7 @@ See:
--dynamic-listeners-disable Disable dynamic listeners.
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
--forward-proxy string URL of the forward proxy. Supported schemas are http and socks5
--forward-proxy string URL of the forward proxy. Supported schemas are socks5 and http
-h, --help help for server
--http-disable Disable HTTP endpoints
--http-health-path string Path on which to health endpoint (default "/health")
Expand Down Expand Up @@ -112,9 +113,15 @@ See:
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
--sasl-enable Connect using SASL/PLAIN
--sasl-enable Connect using SASL
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
--sasl-password string SASL user password
--sasl-plugin-command string Path to authentication plugin binary
--sasl-plugin-enable Use plugin for SASL authentication
--sasl-plugin-log-level string Log level of the auth plugin (default "trace")
--sasl-plugin-mechanism string SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER (default "OAUTHBEARER")
--sasl-plugin-param stringArray Authentication plugin parameter
--sasl-plugin-timeout duration Authentication timeout (default 10s)
--sasl-username string SASL user name
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
--tls-client-cert-file string PEM encoded file with client certificate
Expand All @@ -123,8 +130,6 @@ See:
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name



### Usage example

kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,0.0.0.0:32399"
Expand All @@ -144,14 +149,29 @@ See:
--external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \
--forbidden-api-keys 20


export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server

### SASL authentication initiated by proxy example

SASL authentication is initiated by the proxy. SASL authentication is disabled on the clients and enabled on the Kafka brokers.

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
--tls-enable --tls-insecure-skip-verify \
--sasl-enable --sasl-username myuser --sasl-password mysecret

export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server
make clean build plugin.unsecured-jwt-provider && build/kafka-proxy server \
--sasl-enable \
--sasl-plugin-enable \
--sasl-plugin-mechanism "OAUTHBEARER" \
--sasl-plugin-command build/unsecured-jwt-provider \
--sasl-plugin-param "--claim-sub=alice" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

### Proxy authentication example

SASL authentication is performed by the proxy. SASL authentication is enabled on the clients and disabled on the Kafka brokers.

make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
--proxy-listener-cert-file "server-cert.pem" \
--proxy-listener-ca-chain-cert-file "ca.pem" \
Expand All @@ -169,6 +189,14 @@ See:
--auth-local-param "--user-attr=uid" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

make clean build plugin.unsecured-jwt-info && build/kafka-proxy server \
--auth-local-enable \
--auth-local-command build/unsecured-jwt-info \
--auth-local-mechanism "OAUTHBEARER" \
--auth-local-param "--claim-sub=alice" \
--auth-local-param "--claim-sub=bob" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

### Kafka Gateway example

Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID (service account JWT)
Expand Down
151 changes: 114 additions & 37 deletions cmd/kafka-proxy/server.go
Expand Up @@ -21,9 +21,9 @@ import (

"errors"
"github.com/grepplabs/kafka-proxy/pkg/apis"
gatewayclient "github.com/grepplabs/kafka-proxy/plugin/gateway-client/shared"
gatewayserver "github.com/grepplabs/kafka-proxy/plugin/gateway-server/shared"
localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"strings"
Expand Down Expand Up @@ -101,6 +101,7 @@ func initFlags() {
// local authentication plugin
Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers")
Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary")
Server.Flags().StringVar(&c.Auth.Local.Mechanism, "auth-local-mechanism", "PLAIN", "SASL mechanism used for local authentication: PLAIN or OAUTHBEARER")
Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Auth.Local.Timeout, "auth-local-timeout", 10*time.Second, "Authentication timeout")
Expand Down Expand Up @@ -142,12 +143,20 @@ func initFlags() {
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")

// SASL
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL/PLAIN")
// SASL by Proxy
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")
Server.Flags().StringVar(&c.Kafka.SASL.Password, "sasl-password", "", "SASL user password")
Server.Flags().StringVar(&c.Kafka.SASL.JaasConfigFile, "sasl-jaas-config-file", "", "Location of JAAS config file with SASL username and password")

// SASL by Proxy plugin
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Command, "sasl-plugin-command", "", "Path to authentication plugin binary")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Mechanism, "sasl-plugin-mechanism", "OAUTHBEARER", "SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER")
Server.Flags().StringArrayVar(&c.Kafka.SASL.Plugin.Parameters, "sasl-plugin-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout")

// Web
Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints")
Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on")
Expand All @@ -172,47 +181,115 @@ func initFlags() {
func Run(_ *cobra.Command, _ []string) {
logrus.Infof("Starting kafka-proxy version %s", config.Version)

var passwordAuthenticator apis.PasswordAuthenticator
var localPasswordAuthenticator apis.PasswordAuthenticator
var localTokenAuthenticator apis.TokenInfo
if c.Auth.Local.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in PasswordAuthenticator")
passwordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
switch c.Auth.Local.Mechanism {
case "PLAIN":
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in '%s' PasswordAuthenticator for local PasswordAuthenticator", c.Auth.Local.Command)
localPasswordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
}
localPasswordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
}
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
case "OAUTHBEARER":
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Local.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in '%s' TokenInfo for local TokenAuthenticator", c.Auth.Local.Command)

localTokenAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("tokenInfo")
if err != nil {
logrus.Fatal(err)
}
localTokenAuthenticator, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
}
passwordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
default:
logrus.Fatal(errors.New("unsupported local auth mechanism"))
}
}

var saslTokenProvider apis.TokenProvider
if c.Kafka.SASL.Plugin.Enable {
switch c.Kafka.SASL.Plugin.Mechanism {
case "OAUTHBEARER":
var err error
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Kafka.SASL.Plugin.Command).(apis.TokenProviderFactory)
if ok {
logrus.Infof("Using built-in '%s' TokenProvider for sasl authentication", c.Kafka.SASL.Plugin.Command)

saslTokenProvider, err = factory.New(c.Kafka.SASL.Plugin.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Kafka.SASL.Plugin.LogLevel, c.Kafka.SASL.Plugin.Command, c.Kafka.SASL.Plugin.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("tokenProvider")
if err != nil {
logrus.Fatal(err)
}
saslTokenProvider, ok = raw.(apis.TokenProvider)
if !ok {
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
}
}
default:
logrus.Fatal(errors.New("unsupported sasl auth mechanism"))
}
}

var tokenProvider apis.TokenProvider
var gatewayTokenProvider apis.TokenProvider
if c.Auth.Gateway.Client.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Auth.Gateway.Client.Command).(apis.TokenProviderFactory)
if ok {
logrus.Infof("Using built-in TokenProvider")
tokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
logrus.Infof("Using built-in '%s' TokenProvider for Gateway Client", c.Auth.Gateway.Client.Command)
gatewayTokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayclient.Handshake, gatewayclient.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -223,26 +300,26 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenProvider, ok = raw.(apis.TokenProvider)
gatewayTokenProvider, ok = raw.(apis.TokenProvider)
if !ok {
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
}
}
}

var tokenInfo apis.TokenInfo
var gatewayTokenInfo apis.TokenInfo
if c.Auth.Gateway.Server.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Gateway.Server.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in TokenInfo")
logrus.Infof("Using built-in '%s' TokenInfo for Gateway Server", c.Auth.Gateway.Server.Command)

tokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
gatewayTokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayserver.Handshake, gatewayserver.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -253,7 +330,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenInfo, ok = raw.(apis.TokenInfo)
gatewayTokenInfo, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
Expand All @@ -273,7 +350,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, passwordAuthenticator, tokenProvider, tokenInfo)
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, saslTokenProvider, gatewayTokenProvider, gatewayTokenInfo)
if err != nil {
logrus.Fatal(err)
}
Expand Down

0 comments on commit ac9f4c8

Please sign in to comment.