-
Notifications
You must be signed in to change notification settings - Fork 189
/
gateway.go
93 lines (82 loc) · 3.43 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright © 2020 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istioingress
import (
"fmt"
"github.com/banzaicloud/istio-client-go/pkg/networking/v1alpha3"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/resources/templates"
"github.com/banzaicloud/koperator/pkg/util"
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"
)
func (r *Reconciler) gateway(log logr.Logger, externalListenerConfig v1beta1.ExternalListenerConfig,
ingressConf v1beta1.IngressConfig, ingressConfigName, defaultIngressConfigName string) runtime.Object {
eListenerLabelName := util.ConstructEListenerLabelName(ingressConfigName, externalListenerConfig.Name)
var gatewayName string
if ingressConfigName == util.IngressConfigGlobalName {
gatewayName = fmt.Sprintf(gatewayNameTemplate, r.KafkaCluster.Name, externalListenerConfig.Name)
} else {
gatewayName = fmt.Sprintf(gatewayNameTemplateWithScope, r.KafkaCluster.Name, externalListenerConfig.Name, ingressConfigName)
}
return &v1alpha3.Gateway{
ObjectMeta: templates.ObjectMeta(gatewayName,
labelsForIstioIngress(r.KafkaCluster.Name, eListenerLabelName), r.KafkaCluster),
Spec: v1alpha3.GatewaySpec{
Selector: labelsForIstioIngress(r.KafkaCluster.Name, eListenerLabelName),
Servers: generateServers(r.KafkaCluster, externalListenerConfig, log, ingressConf,
ingressConfigName, defaultIngressConfigName),
},
}
}
func generateServers(kc *v1beta1.KafkaCluster, externalListenerConfig v1beta1.ExternalListenerConfig, log logr.Logger,
ingressConf v1beta1.IngressConfig, ingressConfigName, defaultIngressConfigName string) []v1alpha3.Server {
servers := make([]v1alpha3.Server, 0)
protocol := v1alpha3.ProtocolTCP
var tlsConfig *v1alpha3.TLSOptions
if ingressConf.IstioIngressConfig.TLSOptions != nil {
tlsConfig = ingressConf.IstioIngressConfig.TLSOptions
protocol = v1alpha3.ProtocolTLS
}
brokerIds := util.GetBrokerIdsFromStatusAndSpec(kc.Status.BrokersState, kc.Spec.Brokers, log)
for _, brokerId := range brokerIds {
brokerConfig, err := kafkautils.GatherBrokerConfigIfAvailable(kc.Spec, brokerId)
if err != nil {
log.Error(err, "could not determine brokerConfig")
continue
}
if util.ShouldIncludeBroker(brokerConfig, kc.Status, brokerId, defaultIngressConfigName, ingressConfigName) {
servers = append(servers, v1alpha3.Server{
Port: &v1alpha3.Port{
Number: int(externalListenerConfig.ExternalStartingPort) + brokerId,
Protocol: protocol,
Name: fmt.Sprintf("tcp-broker-%d", brokerId),
},
TLS: tlsConfig,
Hosts: []string{"*"},
})
}
}
servers = append(servers, v1alpha3.Server{
Port: &v1alpha3.Port{
Number: int(externalListenerConfig.GetAnyCastPort()),
Protocol: protocol,
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, "tcp"),
},
Hosts: []string{"*"},
TLS: tlsConfig,
})
return servers
}