-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_util.go
118 lines (90 loc) · 3.08 KB
/
kafka_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package kafka
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"github.com/aerogear/charmil-host-example/pkg/cmd/factory"
"github.com/aerogear/charmil-host-example/pkg/common/commonerr"
"github.com/aerogear/charmil-host-example/pkg/connection"
"github.com/aerogear/charmil-host-example/pkg/kafka/kafkaerr"
"github.com/aerogear/charmil/core/utils/localize"
kafkamgmtclient "github.com/redhat-developer/app-services-sdk-go/kafkamgmt/apiv1/client"
)
var (
validNameRegexp = regexp.MustCompile(`^[a-z]([-a-z0-9]*[a-z0-9])?$`)
validSearchRegexp = regexp.MustCompile(`^([a-zA-Z0-9-_%]*[a-zA-Z0-9-_%])?$`)
)
// Validator is a type for validating Kafka configuration values
type Validator struct {
Localizer localize.Localizer
Connection factory.ConnectionFunc
}
// ValidateName validates the proposed name of a Kafka instance
func (v *Validator) ValidateName(val interface{}) error {
name, ok := val.(string)
if !ok {
return commonerr.NewCastError(val, "string")
}
if len(name) < 1 || len(name) > 32 {
return errors.New(v.Localizer.LocalizeByID("kafka.validation.name.error.lengthError"))
}
matched := validNameRegexp.MatchString(name)
if matched {
return nil
}
return kafkaerr.InvalidNameError(name)
}
// TransformKafkaRequestListItems modifies fields fields from a list of kafka instances
// The main transformation is appending ":443" to the Bootstrap Server URL
func TransformKafkaRequestListItems(items []kafkamgmtclient.KafkaRequest) []kafkamgmtclient.KafkaRequest {
for i := range items {
kafka := items[i]
kafka = *TransformKafkaRequest(&kafka)
items[i] = kafka
}
return items
}
// TransformKafkaRequest modifies fields from the KafkaRequest payload object
// The main transformation is appending ":443" to the Bootstrap Server URL
func TransformKafkaRequest(kafka *kafkamgmtclient.KafkaRequest) *kafkamgmtclient.KafkaRequest {
bootstrapHost := kafka.GetBootstrapServerHost()
if bootstrapHost == "" {
return kafka
}
if !strings.HasSuffix(bootstrapHost, ":443") {
hostURL := fmt.Sprintf("%v:443", bootstrapHost)
kafka.SetBootstrapServerHost(hostURL)
}
return kafka
}
// ValidateSearchInput validates the text provided to filter the Kafka instances
func (v *Validator) ValidateSearchInput(val interface{}) error {
search, ok := val.(string)
if !ok {
return commonerr.NewCastError(val, "string")
}
matched := validSearchRegexp.MatchString(search)
if matched {
return nil
}
return kafkaerr.InvalidSearchValueError(search)
}
// ValidateNameIsAvailable checks if a kafka instance with the given name already exists
func (v *Validator) ValidateNameIsAvailable(val interface{}) error {
name, _ := val.(string)
connection, err := v.Connection(connection.DefaultConfigSkipMasAuth)
if err != nil {
return err
}
api := connection.API()
_, httpRes, _ := GetKafkaByName(context.Background(), api.Kafka(), name)
if httpRes != nil && httpRes.StatusCode == 200 {
return errors.New(v.Localizer.LocalizeByID("kafka.create.error.conflictError", localize.NewEntry("Name", name)))
}
if httpRes != nil && httpRes.Body != nil {
httpRes.Body.Close()
}
return nil
}