From 67d90c32d1097114ad77087e2ad5520ff34bf224 Mon Sep 17 00:00:00 2001 From: Sergio Moya <1083296+smoya@users.noreply.github.com> Date: Mon, 18 Oct 2021 10:36:20 +0200 Subject: [PATCH] feat: set a reachable address for proxy that brokers will use for advertisement (#63) --- config/kafka.go | 3 +++ docs/config/kafka.md | 21 ++++++++++---------- kafka/config.go | 5 ++++- kafka/proxy.go | 46 +++++++++++++++++++++++++++++--------------- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/config/kafka.go b/config/kafka.go index 8901c09..68c6af6 100644 --- a/config/kafka.go +++ b/config/kafka.go @@ -17,6 +17,7 @@ import ( // KafkaProxy holds the config for later configuring a Kafka proxy. type KafkaProxy struct { + Address string `desc:"Address for this proxy. Should be reachable by your clients. Most probably a domain."` BrokerFromServer string `split_words:"true" desc:"When configuring from an AsyncAPI doc, this allows the user to only configure one server instead of all"` MessageValidation MessageValidation `split_words:"true"` TLS *kafka.TLSConfig @@ -76,7 +77,9 @@ func (c *KafkaProxy) ProxyConfig(d []byte, debug bool) (*kafka.ProxyConfig, erro if err != nil { return nil, err } + conf.TLS = c.TLS + conf.Address = c.Address return conf, nil } diff --git a/docs/config/kafka.md b/docs/config/kafka.md index 5e586d6..8ccbd32 100644 --- a/docs/config/kafka.md +++ b/docs/config/kafka.md @@ -6,10 +6,10 @@ ## Configuration The Kafka proxy is mostly configured by setting some properties on the `server` object. -| Server property | Type | Description | Default | Required | examples | -|-----------------------------|--------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------| -| x-eventgateway-listener | string | Configure the mapping between remote broker address (the address set in the broker server `URL` field) and desired local address. Format is `remotehost:remoteport,localhost:localport`. Multiple values can be configured by using pipe separation (`\|`) | `0.0.0.0:` | No | `test.mykafkacluster.org:8092,localhost:28002`, `test.mykafkacluster.org:8092,localhost:28002\|test2.mykafkacluster.org:8092,localhost:28003` | -| x-eventgateway-dial-mapping | string | Configure the mapping between published remote broker address and the address the proxy will use when forwarding requests. Format is `local_advertised_host:local_advertised_port,remotehost:remoteport`. Multiple values can be configured by using pipe separation (`\|`) | - | No | `0.0.0.0:8092,test.myeventgateway.org:8092`, `0.0.0.0:8092,test.myeventgateway.org:8092,\|0.0.0.0:8093,test.myeventgateway.org:8093` | +| Server property | Type | Description | Default | Required | examples | +|-------------------------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------| +| x-eventgateway-listener | string | Configure the mapping between remote broker address (the address set in the broker server `URL` field) and desired local address. Format is `remotehost:remoteport,localhost:localport`. Multiple values can be configured by using pipe separation (`\|`) | `0.0.0.0:` | No | `test.mykafkacluster.org:8092,localhost:28002`, `test.mykafkacluster.org:8092,localhost:28002\|test2.mykafkacluster.org:8092,localhost:28003` | +| x-eventgateway-dial-mapping | string | Override a broker address with another address the proxy will use when connecting. Format is `remotehost:remoteport,new-remotehost:new-remoteport`. Multiple values can be configured by using pipe separation (`\|`) | - | No | `0.0.0.0:8092,test.myeventgateway.org:8092`, `test.mykafkacluster.org:8092,mykafkacluster.org:8092,\|`test.mykafkacluster.org:8093,mykafkacluster.org:8093` | #### Example ```yaml @@ -19,15 +19,16 @@ servers: url: broker.mybrokers.org:9092 protocol: kafka x-eventgateway-listener: 28002 # optional. 0.0.0.0:9092 will be used instead if missing. - x-eventgateway-dial-mapping: '0.0.0.0:28002,test.myeventgateway.org:28002' # optional. + x-eventgateway-dial-mapping: '0.0.0.0:28002,test.myeventgateway.org:8092' # optional. # ... ``` ## Advanced configuration Some advanced configuration is only available through environment variables. -| Environment variable | Type | Description | Default | Required | examples | -|---------------------------------------------|--------|--------------------------------------------------------------------------------|---------|----------|----------------------------------| -| EVENTGATEWAY_KAFKA_PROXY_BROKER_FROM_SERVER | string | When set, only the specified server will be considered instead of all servers. | - | No | `name-of-server1`, `server-test` | -| EVENTGATEWAY_KAFKA_PROXY_MESSAGE_VALIDATION_ENABLED | boolean | Enable or disable validation of Kafka messages | `true` | No | `true`, `false` | -| EVENTGATEWAY_KAFKA_PROXY_EXTRA_FLAGS | string | Advanced configuration. Configure any flag from [here](https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195). Multiple values can be configured by using pipe separation (`\|`) | - | No | `tls-enable=true\|tls-client-cert-file=/opt/var/service.cert\|tls-client-key-file=/opt/var/service.key` | \ No newline at end of file +| Environment variable | Type | Description | Default | Required | examples | +|-----------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|----------|---------------------------------------------------------------------------------------------------------| +| EVENTGATEWAY_KAFKA_PROXY_ADDRESS | string | Address for this proxy. Clients will use this address as host when connecting to the brokers through this proxy, so it should be reachable by your clients. Most probably a domain name. | `0.0.0.0` | No | `event-gateway-demo.asyncapi.org` | +| EVENTGATEWAY_KAFKA_PROXY_BROKER_FROM_SERVER | string | When set, only the specified server will be considered instead of all servers. | - | No | `name-of-server1`, `server-test` | +| EVENTGATEWAY_KAFKA_PROXY_MESSAGE_VALIDATION_ENABLED | boolean | Enable or disable validation of Kafka messages | `true` | No | `true`, `false` | +| EVENTGATEWAY_KAFKA_PROXY_EXTRA_FLAGS | string | Advanced configuration. Configure any flag from [here](https://github.com/grepplabs/kafka-proxy/blob/4f3b89fbaecb3eb82426f5dcff5f76188ea9a9dc/cmd/kafka-proxy/server.go#L85-L195). Multiple values can be configured by using pipe separation (`\|`) | - | No | `tls-enable=true\|tls-client-cert-file=/opt/var/service.cert\|tls-client-key-file=/opt/var/service.key` | \ No newline at end of file diff --git a/kafka/config.go b/kafka/config.go index 9d3bfb8..40be883 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -19,6 +19,9 @@ var localHostIpv4 = regexp.MustCompile(`127\.0\.0\.\d+`) // ProxyConfig holds the configuration for the Kafka Proxy. type ProxyConfig struct { + // Address for this proxy. Should be reachable by your clients. Most probably a domain name. + // If not set, 0.0.0.0 will be used. + Address string BrokersMapping []string DialAddressMapping []string ExtraConfig []string @@ -70,7 +73,7 @@ func (c *TLSConfig) Config() (*tls.Config, error) { // ProxyOption represents a functional configuration for the Proxy. type ProxyOption func(*ProxyConfig) error -// WithMessageHandler ... +// WithMessageHandler configures a handler that will handle all incoming messages. func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption { return func(c *ProxyConfig) error { c.MessageHandler = handler diff --git a/kafka/proxy.go b/kafka/proxy.go index d2ea344..5bd00b8 100644 --- a/kafka/proxy.go +++ b/kafka/proxy.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "net" "strings" "github.com/Shopify/sarama" @@ -30,6 +31,8 @@ const ( messagesChannelName = "kafka-produced-messages" ) +const defaultLocalAddress = "0.0.0.0" + var defaultMarshaler = watermillkafka.DefaultMarshaler{} // NewProxy creates a new Kafka Proxy based on a given configuration. @@ -42,18 +45,34 @@ func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error) { return nil, err } - // Yeah, not a good practice at all but I guess it's fine for now. kafkaproxy.ActualDefaultRequestHandler.RequestKeyHandlers.Set(RequestAPIKeyProduce, NewProduceRequestHandler(r, c.MessageHandler, c.MessagePublisher, c.PublishToTopic)) - // Setting some defaults. - _ = server.Server.Flags().Set("default-listener-ip", "0.0.0.0") // Binding to all local network interfaces. Needed for external calls. + // Binding to all local network interfaces instead of 127.0.0.1. Sometimes needed for external calls such as health checks. + _ = server.Server.Flags().Set("default-listener-ip", defaultLocalAddress) - if c.BrokersMapping == nil { - return nil, errors.New("Brokers mapping is required") + reachableAddress := c.Address + if reachableAddress == "" { + reachableAddress = defaultLocalAddress } - if c.Debug { - _ = server.Server.Flags().Set("log-level", "debug") + for _, v := range c.BrokersMapping { + values := strings.Split(v, ",") + if len(values) == 2 && reachableAddress != defaultLocalAddress { + // If there is no advertisedAddress set in the mapping (there is no third value) but an address has been set for the proxy, + // then advertise brokers using that given proxy address so Kafka clients can reach the brokers through the proxy. + _, localBindingPort, _ := net.SplitHostPort(values[1]) + v += fmt.Sprintf(",%s:%s", reachableAddress, localBindingPort) + } + + _ = server.Server.Flags().Set("bootstrap-server-mapping", v) + } + + // As previously done for bootstrap brokers, all dynamic listeners created when brokers are discovered + // will be advertised to Kafka clients using the configured address or a default. + _ = server.Server.Flags().Set("dynamic-advertised-listener", reachableAddress) + + for _, v := range c.DialAddressMapping { + _ = server.Server.Flags().Set("dial-address-mapping", v) } if c.TLS != nil && c.TLS.Enable { @@ -64,19 +83,16 @@ func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error) { _ = server.Server.Flags().Set("tls-ca-chain-cert-file", c.TLS.CAChainCertFile) } + if c.Debug { + _ = server.Server.Flags().Set("log-level", "debug") + } + + // Some config can be overridden here. It is intentional. for _, v := range c.ExtraConfig { f := strings.Split(v, "=") _ = server.Server.Flags().Set(f[0], f[1]) } - for _, v := range c.BrokersMapping { - _ = server.Server.Flags().Set("bootstrap-server-mapping", v) - } - - for _, v := range c.DialAddressMapping { - _ = server.Server.Flags().Set("dial-address-mapping", v) - } - return func(ctx context.Context) error { return server.Server.Execute() }, nil