Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Public internal #17

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4185,10 +4185,26 @@ spec:
containerPort:
format: int32
type: integer
ingressForwarded:
description: If set to true, Envoy will create routes for
the internal listener ports. Required if `ingressForwarded`
or `useExternalHostname` is set.
type: boolean
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
type: string
useExternalHostname:
description: If set to true, the Kafka brokers will use the
external hostname for inter broker communication.
type: boolean
usedForControllerCommunication:
type: boolean
usedForInnerBrokerCommunication:
Expand Down
130 changes: 82 additions & 48 deletions pkg/resources/envoy/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,57 +82,37 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, envoyConfig *v1beta1.EnvoyCon
}
}
if kc.Spec.ListenersConfig.ExternalListeners != nil {
listeners = append(listeners, &envoyapi.Listener{
Address: &envoycore.Address{
Address: &envoycore.Address_SocketAddress{
SocketAddress: &envoycore.SocketAddress{
Address: "0.0.0.0",
PortSpecifier: &envoycore.SocketAddress_PortValue{
PortValue: uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ExternalStartingPort + int32(brokerId)),
},
},
},
},
FilterChains: []*envoylistener.FilterChain{
{
Filters: []*envoylistener.Filter{
{
Name: wellknown.TCPProxy,
ConfigType: &envoylistener.Filter_Config{
Config: &ptypesstruct.Struct{
Fields: map[string]*ptypesstruct.Value{
"stat_prefix": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("broker_tcp-%d", brokerId)}},
"cluster": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("broker-%d", brokerId)}},
},
},
},
},
},
},
},
})
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-external", brokerId),
uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ExternalStartingPort+int32(brokerId))))
}

for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners {
if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 {
if internalListener.UsedForInnerBrokerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-internal", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
} else if internalListener.UsedForControllerCommunication {
listeners = append(listeners, envoyListener(fmt.Sprintf("broker-%d-controller", brokerId),
uint32(internalListener.InternalStartingPort+int32(brokerId))))
}
}
}

if kc.Spec.ListenersConfig.ExternalListeners != nil {
clusters = append(clusters, &envoyapi.Cluster{
Name: fmt.Sprintf("broker-%d", brokerId),
ConnectTimeout: &duration.Duration{Seconds: 1},
ClusterDiscoveryType: &envoyapi.Cluster_Type{Type: envoyapi.Cluster_STRICT_DNS},
LbPolicy: envoyapi.Cluster_ROUND_ROBIN,
Http2ProtocolOptions: &envoycore.Http2ProtocolOptions{},
Hosts: []*envoycore.Address{
{
Address: &envoycore.Address_SocketAddress{
SocketAddress: &envoycore.SocketAddress{
Address: fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s", kc.Name, brokerId, kc.Name, kc.Namespace, kc.Spec.GetKubernetesClusterDomain()),
PortSpecifier: &envoycore.SocketAddress_PortValue{
PortValue: uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ContainerPort),
},
},
},
},
},
})
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-external", brokerId), uint32(brokerId),
uint32(kc.Spec.ListenersConfig.ExternalListeners[0].ContainerPort)))
}

for _, internalListener := range kc.Spec.ListenersConfig.InternalListeners {
if internalListener.IngressForwarded && internalListener.InternalStartingPort > 0 {
if internalListener.UsedForInnerBrokerCommunication {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-internal", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
} else {
clusters = append(clusters, envoyCluster(kc, fmt.Sprintf("broker-%d-controller", brokerId), uint32(brokerId),
uint32(internalListener.ContainerPort)))
}
}
}
}

Expand All @@ -159,6 +139,60 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, envoyConfig *v1beta1.EnvoyCon
return string(marshalledConfig)
}

func envoyListener(name string, containerPort uint32) *envoyapi.Listener {
return &envoyapi.Listener{
Address: &envoycore.Address{
Address: &envoycore.Address_SocketAddress{
SocketAddress: &envoycore.SocketAddress{
Address: "0.0.0.0",
PortSpecifier: &envoycore.SocketAddress_PortValue{
PortValue: containerPort,
},
},
},
},
FilterChains: []*envoylistener.FilterChain{
{
Filters: []*envoylistener.Filter{
{
Name: wellknown.TCPProxy,
ConfigType: &envoylistener.Filter_Config{
Config: &ptypesstruct.Struct{
Fields: map[string]*ptypesstruct.Value{
"stat_prefix": {Kind: &ptypesstruct.Value_StringValue{StringValue: fmt.Sprintf("tcp_%s", name)}},
"cluster": {Kind: &ptypesstruct.Value_StringValue{StringValue: name}},
},
},
},
},
},
},
},
}
}

func envoyCluster(kc *v1beta1.KafkaCluster, name string, brokerId, containerPort uint32) *envoyapi.Cluster {
return &envoyapi.Cluster{
Name: name,
ConnectTimeout: &duration.Duration{Seconds: 1},
ClusterDiscoveryType: &envoyapi.Cluster_Type{Type: envoyapi.Cluster_STRICT_DNS},
LbPolicy: envoyapi.Cluster_ROUND_ROBIN,
Http2ProtocolOptions: &envoycore.Http2ProtocolOptions{},
Hosts: []*envoycore.Address{
{
Address: &envoycore.Address_SocketAddress{
SocketAddress: &envoycore.SocketAddress{
Address: fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s", kc.Name, brokerId, kc.Name, kc.Namespace, kc.Spec.GetKubernetesClusterDomain()),
PortSpecifier: &envoycore.SocketAddress_PortValue{
PortValue: containerPort,
},
},
},
},
},
}
}

func configName(envoyConfig *v1beta1.EnvoyConfig) string {
if envoyConfig.Id == envoyGlobal {
return envoyVolumeAndConfigName
Expand Down
33 changes: 19 additions & 14 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *Reconciler) getConfigString(bConfig *v1beta1.BrokerConfig, id int32, se
"ListenerConfig": generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, log),
"SSLEnabledForInternalCommunication": r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil && util.IsSSLEnabledForInternalCommunication(r.KafkaCluster.Spec.ListenersConfig.InternalListeners),
"ZookeeperConnectString": zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath()),
"CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled),
"CruiseControlBootstrapServers": getInternalListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners, id, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled),
"StorageConfig": generateStorageConfig(bConfig.StorageConfigs),
"AdvertisedListenersConfig": generateAdvertisedListenerConfig(id, &r.KafkaCluster.Spec.ListenersConfig, bConfig.HostnameOverride, r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.Namespace, r.KafkaCluster.Name, r.KafkaCluster.Spec.HeadlessServiceEnabled),
"SuperUsers": strings.Join(generateSuperUsers(superUsers), ";"),
Expand Down Expand Up @@ -134,12 +134,17 @@ func generateAdvertisedListenerConfig(id int32, l *v1beta1.ListenersConfig, brok
fmt.Sprintf("%s://%s:%d", strings.ToUpper(eListener.Name), brokerHostname, eListener.ExternalStartingPort+id))
}
for _, iListener := range l.InternalListeners {
if headlessServiceEnabled {
if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort))
fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id))
} else {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort))
if headlessServiceEnabled {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort))
} else {
advertisedListenerConfig = append(advertisedListenerConfig,
fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort))
}
}
}
return fmt.Sprintf("advertised.listeners=%s\n", strings.Join(advertisedListenerConfig, ","))
Expand Down Expand Up @@ -195,21 +200,21 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, log logr.Logger)
"listeners=" + strings.Join(listenerConfig, ",") + "\n"
}

func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, domain, namespace, crName string, headlessServiceEnabled bool) string {

internalListener := ""

func getInternalListener(iListeners []v1beta1.InternalListenerConfig, id int32, brokerHostname, domain, namespace, crName string, headlessServiceEnabled bool) string {
for _, iListener := range iListeners {
if iListener.UsedForInnerBrokerCommunication {
if headlessServiceEnabled {
internalListener = fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort)
if iListener.UseExternalHostname && iListener.InternalStartingPort > 0 {
return fmt.Sprintf("%s://%s:%d", strings.ToUpper(iListener.Name), brokerHostname, iListener.InternalStartingPort+id)
} else {
internalListener = fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort)
if headlessServiceEnabled {
return fmt.Sprintf("%s://%s-%d.%s-headless.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, crName, namespace, domain, iListener.ContainerPort)
} else {
return fmt.Sprintf("%s://%s-%d.%s.svc.%s:%d", strings.ToUpper(iListener.Name), crName, id, namespace, domain, iListener.ContainerPort)
}
}
}
}

return internalListener
return ""
}

func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, serverPass, clientPass string, superUsers []string, log logr.Logger) string {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sdk/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to true, Envoy will create routes for the internal listener ports.
// Required if `ingressForwarded` or `useExternalHostname` is set.
// +optional
IngressForwarded bool `json:"ingressForwarded,omitempty"`
// If set to true, the Kafka brokers will use the external hostname for inter broker communication.
// +optional
UseExternalHostname bool `json:"useExternalHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down