-
Notifications
You must be signed in to change notification settings - Fork 41
/
kafka.go
133 lines (118 loc) · 5.78 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package presenters
import (
"fmt"
"strings"
"time"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/api/dbapi"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/api/public"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/config"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"
)
// ConvertKafkaRequest from payload to KafkaRequest
func ConvertKafkaRequest(kafkaRequestPayload public.KafkaRequestPayload, dbKafkarequest ...*dbapi.KafkaRequest) *dbapi.KafkaRequest {
var kafka *dbapi.KafkaRequest
if len(dbKafkarequest) == 0 {
kafka = &dbapi.KafkaRequest{}
} else {
kafka = dbKafkarequest[0]
}
kafka.Region = kafkaRequestPayload.Region
kafka.Name = kafkaRequestPayload.Name
kafka.CloudProvider = kafkaRequestPayload.CloudProvider
kafka.BillingCloudAccountId = shared.SafeString(kafkaRequestPayload.BillingCloudAccountId)
kafka.Marketplace = shared.SafeString(kafkaRequestPayload.Marketplace)
kafka.DesiredKafkaBillingModel = shared.SafeString(kafkaRequestPayload.BillingModel)
if kafkaRequestPayload.ReauthenticationEnabled != nil {
kafka.ReauthenticationEnabled = *kafkaRequestPayload.ReauthenticationEnabled
} else {
kafka.ReauthenticationEnabled = true // true by default
}
return kafka
}
// PresentKafkaRequest - create KafkaRequest in an appropriate format ready to be returned by the API
func PresentKafkaRequest(kafkaRequest *dbapi.KafkaRequest, kafkaConfig *config.KafkaConfig) (public.KafkaRequest, *errors.ServiceError) {
reference := PresentReference(kafkaRequest.ID, kafkaRequest)
var ingressThroughputPerSec, egressThroughputPerSec, maxDataRetentionPeriod string
var totalMaxConnections, maxPartitions, maxConnectionAttemptsPerSec int
var expiresAt *time.Time
if kafkaConfig != nil {
instanceSize, err := kafkaConfig.GetKafkaInstanceSize(kafkaRequest.InstanceType, kafkaRequest.SizeId)
if err != nil {
logger.Logger.Error(err)
} else {
ingressThroughputPerSec = instanceSize.IngressThroughputPerSec.String()
egressThroughputPerSec = instanceSize.EgressThroughputPerSec.String()
totalMaxConnections = instanceSize.TotalMaxConnections
maxPartitions = instanceSize.MaxPartitions
maxDataRetentionPeriod = instanceSize.MaxDataRetentionPeriod
maxConnectionAttemptsPerSec = instanceSize.MaxConnectionAttemptsPerSec
if instanceSize.LifespanSeconds != nil {
expiresAt = kafkaRequest.GetExpirationTime(*instanceSize.LifespanSeconds)
}
}
}
displayName, err := getDisplayName(kafkaRequest.InstanceType, kafkaConfig)
if err != nil {
return public.KafkaRequest{}, err
}
// convert kafka storage size to bytes
maxDataRetentionSizeQuantity := config.Quantity(kafkaRequest.KafkaStorageSize)
maxDataRetentionSizeBytes, conversionErr := maxDataRetentionSizeQuantity.ToInt64()
if conversionErr != nil {
return public.KafkaRequest{}, errors.NewWithCause(errors.ErrorGeneral, conversionErr, "failed to get bytes value for max_data_retention_size")
}
return public.KafkaRequest{
Id: reference.Id,
Kind: reference.Kind,
Href: reference.Href,
Region: kafkaRequest.Region,
Name: kafkaRequest.Name,
CloudProvider: kafkaRequest.CloudProvider,
MultiAz: kafkaRequest.MultiAZ,
Owner: kafkaRequest.Owner,
BootstrapServerHost: setBootstrapServerHost(kafkaRequest.BootstrapServerHost),
AdminApiServerUrl: kafkaRequest.AdminApiServerURL,
Status: kafkaRequest.Status,
CreatedAt: kafkaRequest.CreatedAt,
UpdatedAt: kafkaRequest.UpdatedAt,
ExpiresAt: expiresAt,
FailedReason: kafkaRequest.FailedReason,
Version: kafkaRequest.ActualKafkaVersion,
InstanceType: kafkaRequest.InstanceType,
ReauthenticationEnabled: kafkaRequest.ReauthenticationEnabled,
DeprecatedKafkaStorageSize: kafkaRequest.KafkaStorageSize,
MaxDataRetentionSize: public.SupportedKafkaSizeBytesValueItem{
Bytes: maxDataRetentionSizeBytes,
},
BrowserUrl: fmt.Sprintf("%s/%s/dashboard", strings.TrimSuffix(kafkaConfig.BrowserUrl, "/"), reference.Id),
SizeId: kafkaRequest.SizeId,
DeprecatedInstanceTypeName: displayName,
DeprecatedIngressThroughputPerSec: ingressThroughputPerSec,
DeprecatedEgressThroughputPerSec: egressThroughputPerSec,
DeprecatedTotalMaxConnections: int32(totalMaxConnections),
DeprecatedMaxPartitions: int32(maxPartitions),
DeprecatedMaxDataRetentionPeriod: maxDataRetentionPeriod,
DeprecatedMaxConnectionAttemptsPerSec: int32(maxConnectionAttemptsPerSec),
BillingCloudAccountId: kafkaRequest.BillingCloudAccountId,
Marketplace: kafkaRequest.Marketplace,
BillingModel: kafkaRequest.ActualKafkaBillingModel,
}, nil
}
func setBootstrapServerHost(bootstrapServerHost string) string {
if bootstrapServerHost != "" {
return fmt.Sprintf("%s:443", bootstrapServerHost)
}
return bootstrapServerHost
}
func getDisplayName(instanceType string, config *config.KafkaConfig) (string, *errors.ServiceError) {
if config != nil && strings.Trim(instanceType, " ") != "" {
kafkaInstanceType, err := config.SupportedInstanceTypes.Configuration.GetKafkaInstanceTypeByID(instanceType)
if err != nil {
return "", errors.NewWithCause(errors.ErrorGeneral, err, "unable to get kafka display name for '%s' instance type", instanceType)
}
return kafkaInstanceType.DisplayName, nil
}
return "", nil
}