Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
feat: set a reachable address for proxy that brokers will use for adv…
Browse files Browse the repository at this point in the history
…ertisement (#63)
  • Loading branch information
smoya committed Oct 18, 2021
1 parent ad09007 commit 67d90c3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 26 deletions.
3 changes: 3 additions & 0 deletions config/kafka.go
Expand Up @@ -17,6 +17,7 @@ import (

// KafkaProxy holds the config for later configuring a Kafka proxy.
type KafkaProxy struct {
Address string `desc:"Address for this proxy. Should be reachable by your clients. Most probably a domain."`
BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"`
MessageValidation MessageValidation `split_words:"true"`
TLS *kafka.TLSConfig
Expand Down Expand Up @@ -76,7 +77,9 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro
if err != nil {
return nil, err
}

conf.TLS = c.TLS
conf.Address = c.Address

return conf, nil
}
Expand Down
21 changes: 11 additions & 10 deletions docs/config/kafka.md
Expand Up @@ -6,10 +6,10 @@
## Configuration
The Kafka proxy is mostly configured by setting some properties on the `server` object.

| Server property | Type | Description | Default | Required | examples |
|-----------------------------|--------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------|
| x-eventgateway-listener | string | Configure the mapping between remote broker address (the address set in the broker server `URL` field) and desired local address. Format is `remotehost:remoteport,localhost:localport`. Multiple values can be configured by using pipe separation (`\|`) | `0.0.0.0:<remote-server-port>` | No | `test.mykafkacluster.org:8092,localhost:28002`, `test.mykafkacluster.org:8092,localhost:28002\|test2.mykafkacluster.org:8092,localhost:28003` |
| x-eventgateway-dial-mapping | string | Configure the mapping between published remote broker address and the address the proxy will use when forwarding requests. Format is `local_advertised_host:local_advertised_port,remotehost:remoteport`. Multiple values can be configured by using pipe separation (`\|`) | - | No | `0.0.0.0:8092,test.myeventgateway.org:8092`, `0.0.0.0:8092,test.myeventgateway.org:8092,\|0.0.0.0:8093,test.myeventgateway.org:8093` |
| Server property | Type | Description | Default | Required | examples |
|-------------------------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------|
| x-eventgateway-listener | string | Configure the mapping between remote broker address (the address set in the broker server `URL` field) and desired local address. Format is `remotehost:remoteport,localhost:localport`. Multiple values can be configured by using pipe separation (`\|`) | `0.0.0.0:<remote-server-port>` | No | `test.mykafkacluster.org:8092,localhost:28002`, `test.mykafkacluster.org:8092,localhost:28002\|test2.mykafkacluster.org:8092,localhost:28003` |
| x-eventgateway-dial-mapping | string | Override a broker address with another address the proxy will use when connecting. Format is `remotehost:remoteport,new-remotehost:new-remoteport`. Multiple values can be configured by using pipe separation (`\|`) | - | No | `0.0.0.0:8092,test.myeventgateway.org:8092`, `test.mykafkacluster.org:8092,mykafkacluster.org:8092,\|`test.mykafkacluster.org:8093,mykafkacluster.org:8093` |

#### Example
```yaml
Expand All @@ -19,15 +19,16 @@ servers:
url: broker.mybrokers.org:9092
protocol: kafka
x-eventgateway-listener: 28002 # optional. 0.0.0.0:9092 will be used instead if missing.
x-eventgateway-dial-mapping: '0.0.0.0:28002,test.myeventgateway.org:28002' # optional.
x-eventgateway-dial-mapping: '0.0.0.0:28002,test.myeventgateway.org:8092' # optional.
# ...
```

## Advanced configuration
Some advanced configuration is only available through environment variables.

| Environment variable | Type | Description | Default | Required | examples |
|---------------------------------------------|--------|--------------------------------------------------------------------------------|---------|----------|----------------------------------|
| EVENTGATEWAY_KAFKA_PROXY_BROKER_FROM_SERVER | string | When set, only the specified server will be considered instead of all servers. | - | No | `name-of-server1`, `server-test` |
| EVENTGATEWAY_KAFKA_PROXY_MESSAGE_VALIDATION_ENABLED | boolean | Enable or disable validation of Kafka messages | `true` | No | `true`, `false` |
| EVENTGATEWAY_KAFKA_PROXY_EXTRA_FLAGS | string | Advanced configuration. Configure any flag from [here](https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195). Multiple values can be configured by using pipe separation (`\|`) | - | No | `tls-enable=true\|tls-client-cert-file=/opt/var/service.cert\|tls-client-key-file=/opt/var/service.key` |
| Environment variable | Type | Description | Default | Required | examples |
|-----------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|----------|---------------------------------------------------------------------------------------------------------|
| EVENTGATEWAY_KAFKA_PROXY_ADDRESS | string | Address for this proxy. Clients will use this address as host when connecting to the brokers through this proxy, so it should be reachable by your clients. Most probably a domain name. | `0.0.0.0` | No | `event-gateway-demo.asyncapi.org` |
| EVENTGATEWAY_KAFKA_PROXY_BROKER_FROM_SERVER | string | When set, only the specified server will be considered instead of all servers. | - | No | `name-of-server1`, `server-test` |
| EVENTGATEWAY_KAFKA_PROXY_MESSAGE_VALIDATION_ENABLED | boolean | Enable or disable validation of Kafka messages | `true` | No | `true`, `false` |
| EVENTGATEWAY_KAFKA_PROXY_EXTRA_FLAGS | string | Advanced configuration. Configure any flag from [here](https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195). Multiple values can be configured by using pipe separation (`\|`) | - | No | `tls-enable=true\|tls-client-cert-file=/opt/var/service.cert\|tls-client-key-file=/opt/var/service.key` |
5 changes: 4 additions & 1 deletion kafka/config.go
Expand Up @@ -19,6 +19,9 @@ var localHostIpv4 = regexp.MustCompile(`127\.0\.0\.\d+`)

// ProxyConfig holds the configuration for the Kafka Proxy.
type ProxyConfig struct {
// Address for this proxy. Should be reachable by your clients. Most probably a domain name.
// If not set, 0.0.0.0 will be used.
Address string
BrokersMapping []string
DialAddressMapping []string
ExtraConfig []string
Expand Down Expand Up @@ -70,7 +73,7 @@ func (c *TLSConfig) Config() (*tls.Config, error) {
// ProxyOption represents a functional configuration for the Proxy.
type ProxyOption func(*ProxyConfig) error

// WithMessageHandler ...
// WithMessageHandler configures a handler that will handle all incoming messages.
func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption {
return func(c *ProxyConfig) error {
c.MessageHandler = handler
Expand Down
46 changes: 31 additions & 15 deletions kafka/proxy.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"net"
"strings"

"github.com/Shopify/sarama"
Expand All @@ -30,6 +31,8 @@ const (
messagesChannelName = "kafka-produced-messages"
)

const defaultLocalAddress = "0.0.0.0"

var defaultMarshaler = watermillkafka.DefaultMarshaler{}

// NewProxy creates a new Kafka Proxy based on a given configuration.
Expand All @@ -42,18 +45,34 @@ func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error) {
return nil, err
}

// Yeah, not a good practice at all but I guess it's fine for now.
kafkaproxy.ActualDefaultRequestHandler.RequestKeyHandlers.Set(RequestAPIKeyProduce, NewProduceRequestHandler(r, c.MessageHandler, c.MessagePublisher, c.PublishToTopic))

// Setting some defaults.
_ = server.Server.Flags().Set("default-listener-ip", "0.0.0.0") // Binding to all local network interfaces. Needed for external calls.
// Binding to all local network interfaces instead of 127.0.0.1. Sometimes needed for external calls such as health checks.
_ = server.Server.Flags().Set("default-listener-ip", defaultLocalAddress)

if c.BrokersMapping == nil {
return nil, errors.New("Brokers mapping is required")
reachableAddress := c.Address
if reachableAddress == "" {
reachableAddress = defaultLocalAddress
}

if c.Debug {
_ = server.Server.Flags().Set("log-level", "debug")
for _, v := range c.BrokersMapping {
values := strings.Split(v, ",")
if len(values) == 2 && reachableAddress != defaultLocalAddress {
// If there is no advertisedAddress set in the mapping (there is no third value) but an address has been set for the proxy,
// then advertise brokers using that given proxy address so Kafka clients can reach the brokers through the proxy.
_, localBindingPort, _ := net.SplitHostPort(values[1])
v += fmt.Sprintf(",%s:%s", reachableAddress, localBindingPort)
}

_ = server.Server.Flags().Set("bootstrap-server-mapping", v)
}

// As previously done for bootstrap brokers, all dynamic listeners created when brokers are discovered
// will be advertised to Kafka clients using the configured address or a default.
_ = server.Server.Flags().Set("dynamic-advertised-listener", reachableAddress)

for _, v := range c.DialAddressMapping {
_ = server.Server.Flags().Set("dial-address-mapping", v)
}

if c.TLS != nil && c.TLS.Enable {
Expand All @@ -64,19 +83,16 @@ func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error) {
_ = server.Server.Flags().Set("tls-ca-chain-cert-file", c.TLS.CAChainCertFile)
}

if c.Debug {
_ = server.Server.Flags().Set("log-level", "debug")
}

// Some config can be overridden here. It is intentional.
for _, v := range c.ExtraConfig {
f := strings.Split(v, "=")
_ = server.Server.Flags().Set(f[0], f[1])
}

for _, v := range c.BrokersMapping {
_ = server.Server.Flags().Set("bootstrap-server-mapping", v)
}

for _, v := range c.DialAddressMapping {
_ = server.Server.Flags().Set("dial-address-mapping", v)
}

return func(ctx context.Context) error {
return server.Server.Execute()
}, nil
Expand Down

0 comments on commit 67d90c3

Please sign in to comment.