-
Notifications
You must be signed in to change notification settings - Fork 61
/
provider.go
115 lines (94 loc) · 3.68 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kafka
import (
"fmt"
"strings"
crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
cyndi "github.com/RedHatInsights/cyndi-operator/api/v1alpha1"
core "k8s.io/api/core/v1"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
rc "github.com/RedHatInsights/rhc-osdk-utils/resource_cache"
)
var IMAGE_KAFKA_XJOIN = "quay.io/cloudservices/xjoin-kafka-connect-strimzi:latest"
// ProvName is the name/ident of the provider
var ProvName = "kafka"
// CyndiPipeline identifies the main cyndi pipeline object.
var CyndiPipeline = rc.NewSingleResourceIdent(ProvName, "cyndi_pipeline", &cyndi.CyndiPipeline{})
// CyndiAppSecret identifies the cyndi app secret object.
var CyndiAppSecret = rc.NewSingleResourceIdent(ProvName, "cyndi_app_secret", &core.Secret{})
// CyndiHostInventoryAppSecret identifies the cyndi host-inventory app secret object.
var CyndiHostInventoryAppSecret = rc.NewSingleResourceIdent(ProvName, "cyndi_host_inventory_secret", &core.Secret{})
// CyndiConfigMap is the resource ident for a CyndiConfigMap object.
var CyndiConfigMap = rc.NewSingleResourceIdent(ProvName, "cyndi_config_map", &core.ConfigMap{}, rc.ResourceOptions{WriteNow: true})
// GetKafka returns the correct kafka provider based on the environment.
func GetKafka(c *providers.Provider) (providers.ClowderProvider, error) {
c.Env.ConvertDeprecatedKafkaSpec()
kafkaMode := c.Env.Spec.Providers.Kafka.Mode
switch kafkaMode {
case "operator":
return NewStrimzi(c)
case "app-interface":
return NewAppInterface(c)
case "managed":
return NewManagedKafka(c)
case "managed-ephem":
return NewManagedEphemKafka(c)
case "none", "":
return NewNoneKafka(c)
default:
errStr := fmt.Sprintf("No matching kafka mode for %s", kafkaMode)
return nil, errors.New(errStr)
}
}
// GetKafka returns the correct kafka provider based on the environment.
func GetKafkaFinalize(c *providers.Provider) error {
c.Env.ConvertDeprecatedKafkaSpec()
kafkaMode := c.Env.Spec.Providers.Kafka.Mode
switch kafkaMode {
case "managed-ephem":
return NewManagedEphemKafkaFinalizer(c)
case "none", "":
return nil
default:
c.Log.Info(fmt.Sprintf("No matching kafka mode for %s", kafkaMode))
return nil
}
}
func getKafkaUsername(env *crd.ClowdEnvironment, app *crd.ClowdApp) string {
return fmt.Sprintf("%s-%s", env.Name, app.Name)
}
func getKafkaName(e *crd.ClowdEnvironment) string {
if e.Spec.Providers.Kafka.Cluster.Name == "" {
// generate a unique name based on the ClowdEnvironment's UID
// convert e.UID (which is a apimachinery types.UID) to string
// types.UID is a string alias so this should not fail...
uidString := string(e.UID)
// append the initial portion of the UUID onto the kafka cluster's name
return fmt.Sprintf("%s-%s", e.Name, strings.Split(uidString, "-")[0])
}
return e.Spec.Providers.Kafka.Cluster.Name
}
func getKafkaNamespace(e *crd.ClowdEnvironment) string {
if e.Spec.Providers.Kafka.Cluster.Namespace == "" {
return e.Status.TargetNamespace
}
return e.Spec.Providers.Kafka.Cluster.Namespace
}
func getConnectNamespace(env *crd.ClowdEnvironment) string {
if env.Spec.Providers.Kafka.Connect.Namespace == "" {
return getKafkaNamespace(env)
}
return env.Spec.Providers.Kafka.Connect.Namespace
}
func getConnectClusterName(env *crd.ClowdEnvironment) string {
if env.Spec.Providers.Kafka.Connect.Name == "" {
return getKafkaName(env)
}
return env.Spec.Providers.Kafka.Connect.Name
}
func getConnectClusterUserName(env *crd.ClowdEnvironment) string {
return fmt.Sprintf("%s-connect", env.Name)
}
func init() {
providers.ProvidersRegistration.Register(GetKafka, 6, ProvName, GetKafkaFinalize)
}