Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: false
language: go

go:
- "1.10.x"
- "1.11.x"

env:
global:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.10 as builder
FROM golang:1.11 as builder

ARG GOOS=linux
ARG GOARCH=amd64
Expand Down
19 changes: 13 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.0.8"
TAG ?= "v0.1.0"
GOARCH ?= amd64
GOOS ?= linux

Expand Down Expand Up @@ -46,11 +46,11 @@ release: clean
protoc.local-auth:
protoc -I plugin/local-auth/proto/ plugin/local-auth/proto/auth.proto --go_out=plugins=grpc:plugin/local-auth/proto/

protoc.gateway-client:
protoc -I plugin/gateway-client/proto/ plugin/gateway-client/proto/token-provider.proto --go_out=plugins=grpc:plugin/gateway-client/proto/
protoc.token-provider:
protoc -I plugin/token-provider/proto/ plugin/token-provider/proto/token-provider.proto --go_out=plugins=grpc:plugin/token-provider/proto/

protoc.gateway-server:
protoc -I plugin/gateway-server/proto/ plugin/gateway-server/proto/token-info.proto --go_out=plugins=grpc:plugin/gateway-server/proto/
protoc.token-info:
protoc -I plugin/token-info/proto/ plugin/token-info/proto/token-info.proto --go_out=plugins=grpc:plugin/token-info/proto/

plugin.auth-user:
CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go
Expand All @@ -64,7 +64,14 @@ plugin.google-id-provider:
plugin.google-id-info:
CGO_ENABLED=0 go build -o build/google-id-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-googleid-info/main.go

all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info
plugin.unsecured-jwt-info:
CGO_ENABLED=0 go build -o build/unsecured-jwt-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-info/main.go

plugin.unsecured-jwt-provider:
CGO_ENABLED=0 go build -o build/unsecured-jwt-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-provider/main.go


all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider

clean:
@rm -rf build
42 changes: 35 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ See:

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_linux_amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_linux_amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_darwin_amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_darwin_amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand Down Expand Up @@ -75,6 +75,7 @@ See:
--auth-local-command string Path to authentication plugin binary
--auth-local-enable Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers
--auth-local-log-level string Log level of the auth plugin (default "trace")
--auth-local-mechanism string SASL mechanism used for local authentication: PLAIN or OAUTHBEARER (default "PLAIN")
--auth-local-param stringArray Authentication plugin parameter
--auth-local-timeout duration Authentication timeout (default 10s)
--bootstrap-server-mapping stringArray Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))
Expand All @@ -84,7 +85,7 @@ See:
--dynamic-listeners-disable Disable dynamic listeners.
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
--forward-proxy string URL of the forward proxy. Supported schemas are http and socks5
--forward-proxy string URL of the forward proxy. Supported schemas are socks5 and http
-h, --help help for server
--http-disable Disable HTTP endpoints
--http-health-path string Path on which to health endpoint (default "/health")
Expand Down Expand Up @@ -112,9 +113,15 @@ See:
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
--sasl-enable Connect using SASL/PLAIN
--sasl-enable Connect using SASL
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
--sasl-password string SASL user password
--sasl-plugin-command string Path to authentication plugin binary
--sasl-plugin-enable Use plugin for SASL authentication
--sasl-plugin-log-level string Log level of the auth plugin (default "trace")
--sasl-plugin-mechanism string SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER (default "OAUTHBEARER")
--sasl-plugin-param stringArray Authentication plugin parameter
--sasl-plugin-timeout duration Authentication timeout (default 10s)
--sasl-username string SASL user name
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
--tls-client-cert-file string PEM encoded file with client certificate
Expand All @@ -123,8 +130,6 @@ See:
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name



### Usage example

kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,0.0.0.0:32399"
Expand All @@ -144,14 +149,29 @@ See:
--external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \
--forbidden-api-keys 20


export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server

### SASL authentication initiated by proxy example

SASL authentication is initiated by the proxy. SASL authentication is disabled on the clients and enabled on the Kafka brokers.

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
--tls-enable --tls-insecure-skip-verify \
--sasl-enable --sasl-username myuser --sasl-password mysecret

export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server
make clean build plugin.unsecured-jwt-provider && build/kafka-proxy server \
--sasl-enable \
--sasl-plugin-enable \
--sasl-plugin-mechanism "OAUTHBEARER" \
--sasl-plugin-command build/unsecured-jwt-provider \
--sasl-plugin-param "--claim-sub=alice" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

### Proxy authentication example

SASL authentication is performed by the proxy. SASL authentication is enabled on the clients and disabled on the Kafka brokers.

make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
--proxy-listener-cert-file "server-cert.pem" \
--proxy-listener-ca-chain-cert-file "ca.pem" \
Expand All @@ -169,6 +189,14 @@ See:
--auth-local-param "--user-attr=uid" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

make clean build plugin.unsecured-jwt-info && build/kafka-proxy server \
--auth-local-enable \
--auth-local-command build/unsecured-jwt-info \
--auth-local-mechanism "OAUTHBEARER" \
--auth-local-param "--claim-sub=alice" \
--auth-local-param "--claim-sub=bob" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

### Kafka Gateway example

Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID (service account JWT)
Expand Down
151 changes: 114 additions & 37 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"errors"
"github.com/grepplabs/kafka-proxy/pkg/apis"
gatewayclient "github.com/grepplabs/kafka-proxy/plugin/gateway-client/shared"
gatewayserver "github.com/grepplabs/kafka-proxy/plugin/gateway-server/shared"
localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"strings"
Expand Down Expand Up @@ -101,6 +101,7 @@ func initFlags() {
// local authentication plugin
Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers")
Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary")
Server.Flags().StringVar(&c.Auth.Local.Mechanism, "auth-local-mechanism", "PLAIN", "SASL mechanism used for local authentication: PLAIN or OAUTHBEARER")
Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Auth.Local.Timeout, "auth-local-timeout", 10*time.Second, "Authentication timeout")
Expand Down Expand Up @@ -142,12 +143,20 @@ func initFlags() {
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")

// SASL
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL/PLAIN")
// SASL by Proxy
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")
Server.Flags().StringVar(&c.Kafka.SASL.Password, "sasl-password", "", "SASL user password")
Server.Flags().StringVar(&c.Kafka.SASL.JaasConfigFile, "sasl-jaas-config-file", "", "Location of JAAS config file with SASL username and password")

// SASL by Proxy plugin
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Command, "sasl-plugin-command", "", "Path to authentication plugin binary")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Mechanism, "sasl-plugin-mechanism", "OAUTHBEARER", "SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER")
Server.Flags().StringArrayVar(&c.Kafka.SASL.Plugin.Parameters, "sasl-plugin-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout")

// Web
Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints")
Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on")
Expand All @@ -172,47 +181,115 @@ func initFlags() {
func Run(_ *cobra.Command, _ []string) {
logrus.Infof("Starting kafka-proxy version %s", config.Version)

var passwordAuthenticator apis.PasswordAuthenticator
var localPasswordAuthenticator apis.PasswordAuthenticator
var localTokenAuthenticator apis.TokenInfo
if c.Auth.Local.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in PasswordAuthenticator")
passwordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
switch c.Auth.Local.Mechanism {
case "PLAIN":
var err error
factory, ok := registry.GetComponent(new(apis.PasswordAuthenticatorFactory), c.Auth.Local.Command).(apis.PasswordAuthenticatorFactory)
if ok {
logrus.Infof("Using built-in '%s' PasswordAuthenticator for local PasswordAuthenticator", c.Auth.Local.Command)
localPasswordAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(localauth.Handshake, localauth.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
}
localPasswordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
}
}
raw, err := rpcClient.Dispense("passwordAuthenticator")
if err != nil {
logrus.Fatal(err)
case "OAUTHBEARER":
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Local.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in '%s' TokenInfo for local TokenAuthenticator", c.Auth.Local.Command)

localTokenAuthenticator, err = factory.New(c.Auth.Local.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Local.LogLevel, c.Auth.Local.Command, c.Auth.Local.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("tokenInfo")
if err != nil {
logrus.Fatal(err)
}
localTokenAuthenticator, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
}
passwordAuthenticator, ok = raw.(apis.PasswordAuthenticator)
if !ok {
logrus.Fatal(errors.New("unsupported PasswordAuthenticator plugin type"))
default:
logrus.Fatal(errors.New("unsupported local auth mechanism"))
}
}

var saslTokenProvider apis.TokenProvider
if c.Kafka.SASL.Plugin.Enable {
switch c.Kafka.SASL.Plugin.Mechanism {
case "OAUTHBEARER":
var err error
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Kafka.SASL.Plugin.Command).(apis.TokenProviderFactory)
if ok {
logrus.Infof("Using built-in '%s' TokenProvider for sasl authentication", c.Kafka.SASL.Plugin.Command)

saslTokenProvider, err = factory.New(c.Kafka.SASL.Plugin.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Kafka.SASL.Plugin.LogLevel, c.Kafka.SASL.Plugin.Command, c.Kafka.SASL.Plugin.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
if err != nil {
logrus.Fatal(err)
}
raw, err := rpcClient.Dispense("tokenProvider")
if err != nil {
logrus.Fatal(err)
}
saslTokenProvider, ok = raw.(apis.TokenProvider)
if !ok {
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
}
}
default:
logrus.Fatal(errors.New("unsupported sasl auth mechanism"))
}
}

var tokenProvider apis.TokenProvider
var gatewayTokenProvider apis.TokenProvider
if c.Auth.Gateway.Client.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Auth.Gateway.Client.Command).(apis.TokenProviderFactory)
if ok {
logrus.Infof("Using built-in TokenProvider")
tokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
logrus.Infof("Using built-in '%s' TokenProvider for Gateway Client", c.Auth.Gateway.Client.Command)
gatewayTokenProvider, err = factory.New(c.Auth.Gateway.Client.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayclient.Handshake, gatewayclient.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Auth.Gateway.Client.LogLevel, c.Auth.Gateway.Client.Command, c.Auth.Gateway.Client.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -223,26 +300,26 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenProvider, ok = raw.(apis.TokenProvider)
gatewayTokenProvider, ok = raw.(apis.TokenProvider)
if !ok {
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
}
}
}

var tokenInfo apis.TokenInfo
var gatewayTokenInfo apis.TokenInfo
if c.Auth.Gateway.Server.Enable {
var err error
factory, ok := registry.GetComponent(new(apis.TokenInfoFactory), c.Auth.Gateway.Server.Command).(apis.TokenInfoFactory)
if ok {
logrus.Infof("Using built-in TokenInfo")
logrus.Infof("Using built-in '%s' TokenInfo for Gateway Server", c.Auth.Gateway.Server.Command)

tokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
gatewayTokenInfo, err = factory.New(c.Auth.Gateway.Server.Parameters)
if err != nil {
logrus.Fatal(err)
}
} else {
client := NewPluginClient(gatewayserver.Handshake, gatewayserver.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
client := NewPluginClient(tokeninfo.Handshake, tokeninfo.PluginMap, c.Auth.Gateway.Server.LogLevel, c.Auth.Gateway.Server.Command, c.Auth.Gateway.Server.Parameters)
defer client.Kill()

rpcClient, err := client.Client()
Expand All @@ -253,7 +330,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
tokenInfo, ok = raw.(apis.TokenInfo)
gatewayTokenInfo, ok = raw.(apis.TokenInfo)
if !ok {
logrus.Fatal(errors.New("unsupported TokenInfo plugin type"))
}
Expand All @@ -273,7 +350,7 @@ func Run(_ *cobra.Command, _ []string) {
if err != nil {
logrus.Fatal(err)
}
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, passwordAuthenticator, tokenProvider, tokenInfo)
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, saslTokenProvider, gatewayTokenProvider, gatewayTokenInfo)
if err != nil {
logrus.Fatal(err)
}
Expand Down
Loading