/
providers.go
293 lines (245 loc) · 9.64 KB
/
providers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package config
import (
"errors"
"fmt"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/cloudproviders"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/kafkas/types"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/environments"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
errs "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors"
)
type InstanceType types.KafkaInstanceType
type InstanceTypeMap map[string]InstanceTypeConfig
type InstanceTypeConfig struct {
Limit *int `yaml:"limit"`
// Minimum capacity in number of kafka streaming units that should be
// available (free) at any given moment for a supported instance type in a
// region. If not provided, its default value is 0 which means that there is
// no minimum available capacity required.
// Used for dynamic scaling evaluation.
MinAvailableCapacitySlackStreamingUnits int `yaml:"min_available_capacity_slack_streaming_units"`
}
// Returns a region's supported instance type list as a slice
func (itl InstanceTypeMap) AsSlice() []string {
instanceTypeList := []string{}
for k := range itl {
instanceTypeList = append(instanceTypeList, k)
}
return instanceTypeList
}
type Region struct {
Name string `yaml:"name"`
Default bool `yaml:"default"`
SupportedInstanceTypes InstanceTypeMap `yaml:"supported_instance_type"`
}
func (r Region) IsInstanceTypeSupported(instanceType InstanceType) bool {
for k := range r.SupportedInstanceTypes {
if k == string(instanceType) {
return true
}
}
return false
}
func (r Region) getLimitSetForInstanceTypeInRegion(t string) (*int, *errs.ServiceError) {
if it, found := r.SupportedInstanceTypes[t]; found {
return it.Limit, nil
}
return nil, errs.InstanceTypeNotSupported(fmt.Sprintf("instance type: %s is not supported", t))
}
func (r Region) Validate(dataplaneClusterConfig *DataplaneClusterConfig) error {
counter := 1
totalCapacityUsed := 0
regionCapacity := dataplaneClusterConfig.ClusterConfig.GetCapacityForRegion(r.Name)
// verify that Limits set in this configuration matches the capacity of clusters listed in the data plane configuration
for regionInstanceTypeName, regionInstanceType := range r.SupportedInstanceTypes {
if regionInstanceType.MinAvailableCapacitySlackStreamingUnits < 0 {
return fmt.Errorf("kafka minimum available capacity slack for instance type '%s' in region '%s' cannot be negative", regionInstanceTypeName, r.Name)
}
// skip if limit is not set or is explicitly set to 0
if regionInstanceType.Limit == nil || regionInstanceType.Limit != nil && *regionInstanceType.Limit == 0 {
continue
}
if regionInstanceType.Limit != nil && regionInstanceType.MinAvailableCapacitySlackStreamingUnits > *regionInstanceType.Limit {
return fmt.Errorf("configured kafka minimum available capacity slack '%d' for instance type '%s' in region '%s' cannot be bigger than its region limit '%v'", regionInstanceType.MinAvailableCapacitySlackStreamingUnits, regionInstanceTypeName, r.Name, *regionInstanceType.Limit)
}
// validate instance type limits with the data plane cluster configuration when manual scaling is enabled
if dataplaneClusterConfig.IsDataPlaneManualScalingEnabled() {
if len(r.SupportedInstanceTypes) == 1 {
capacity := dataplaneClusterConfig.ClusterConfig.GetCapacityForRegionAndInstanceType(r.Name, regionInstanceTypeName, false)
if *regionInstanceType.Limit != capacity {
return fmt.Errorf("limit for instance type '%s'(%d) does not match the capacity in region %s(%d)", regionInstanceTypeName, *regionInstanceType.Limit, r.Name, capacity)
}
return nil
}
// ensure that limit is within min and max capacity
// min: the total capacity of clusters that support only this instance type
// max: the total capacity of clusters that supports this instance type
minCapacity := dataplaneClusterConfig.ClusterConfig.GetCapacityForRegionAndInstanceType(r.Name, regionInstanceTypeName, true)
maxCapacity := dataplaneClusterConfig.ClusterConfig.GetCapacityForRegionAndInstanceType(r.Name, regionInstanceTypeName, false)
if minCapacity > *regionInstanceType.Limit || maxCapacity < *regionInstanceType.Limit {
return fmt.Errorf("limit for %s instance type (%d) does not match cluster capacity configuration in region '%s': min(%d), max(%d)", regionInstanceTypeName, *regionInstanceType.Limit, r.Name, minCapacity, maxCapacity)
}
// when all limits are set, ensure its total adds up to total capacity of the region.
if !r.RegionHasZeroOrNoLimitInstanceType() {
totalCapacityUsed += *regionInstanceType.Limit
// when we reach the last item, ensure limits for all instance types adds up to the total capacity of the region
if counter == len(r.SupportedInstanceTypes) && totalCapacityUsed != regionCapacity {
return fmt.Errorf("total limits set in region '%s' does not match cluster capacity configuration", r.Name)
}
}
counter++
}
}
return nil
}
func (r Region) RegionHasZeroOrNoLimitInstanceType() bool {
for _, it := range r.SupportedInstanceTypes {
if it.Limit == nil || (it.Limit != nil && *it.Limit == 0) {
return true
}
}
return false
}
type RegionList []Region
func (rl RegionList) GetByName(regionName string) (Region, bool) {
for _, r := range rl {
if r.Name == regionName {
return r, true
}
}
return Region{}, false
}
func (rl RegionList) String() string {
var names []string
for _, r := range rl {
names = append(names, r.Name)
}
return fmt.Sprint(names)
}
type Provider struct {
Name string `yaml:"name"`
Default bool `yaml:"default"`
Regions RegionList `yaml:"regions"`
}
type ProviderList []Provider
func (pl ProviderList) GetByName(providerName string) (Provider, bool) {
for _, p := range pl {
if p.Name == providerName {
return p, true
}
}
return Provider{}, false
}
func (pl ProviderList) String() string {
var names []string
for _, p := range pl {
names = append(names, p.Name)
}
return fmt.Sprint(names)
}
type ProviderConfiguration struct {
SupportedProviders ProviderList `yaml:"supported_providers"`
}
type ProviderConfig struct {
ProvidersConfig ProviderConfiguration
ProvidersConfigFile string
}
func NewSupportedProvidersConfig() *ProviderConfig {
return &ProviderConfig{
ProvidersConfigFile: "config/provider-configuration.yaml",
}
}
var _ environments.ServiceValidator = &ProviderConfig{}
func (c *ProviderConfig) Validate(env *environments.Env) error {
var dataplaneClusterConfig *DataplaneClusterConfig
env.MustResolve(&dataplaneClusterConfig)
providerDefaultCount := 0
for _, p := range c.ProvidersConfig.SupportedProviders {
if err := p.Validate(dataplaneClusterConfig); err != nil {
return err
}
if p.Default {
providerDefaultCount++
}
}
if providerDefaultCount != 1 {
return fmt.Errorf("expected 1 default provider in provider list, got %d", providerDefaultCount)
}
return nil
}
func (provider Provider) Validate(dataplaneClusterConfig *DataplaneClusterConfig) error {
knownCloudProviders := cloudproviders.KnownCloudProviders()
cloudProviderID := cloudproviders.ParseCloudProviderID(provider.Name)
cloudProviderIsKnown := knownCloudProviders.Contains(cloudProviderID)
if !cloudProviderIsKnown {
return fmt.Errorf("cloud Provider '%s' is not a recognized Cloud Provider", cloudProviderID)
}
// verify that machine type configuration are there during dynamic scaling mode
if dataplaneClusterConfig.IsDataPlaneAutoScalingEnabled() {
_, err := dataplaneClusterConfig.DefaultComputeMachinesConfig(cloudProviderID)
if err != nil {
return err
}
}
// verify that there is only one default region
defaultCount := 0
for _, r := range provider.Regions {
if r.Default {
defaultCount++
}
if err := r.Validate(dataplaneClusterConfig); err != nil {
return err
}
}
if defaultCount != 1 {
return fmt.Errorf("expected 1 default region in provider %s, got %d", provider.Name, defaultCount)
}
return nil
}
func (c *ProviderConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.ProvidersConfigFile, "providers-config-file", c.ProvidersConfigFile, "SupportedProviders configuration file")
}
func (c *ProviderConfig) ReadFiles() error {
return readFileProvidersConfig(c.ProvidersConfigFile, &c.ProvidersConfig)
}
func (c *ProviderConfig) GetInstanceLimit(region string, providerName string, instanceType string) (*int, *errs.ServiceError) {
provider, ok := c.ProvidersConfig.SupportedProviders.GetByName(providerName)
if !ok {
return nil, errs.ProviderNotSupported(fmt.Sprintf("cloud provider '%s' is unsupported", providerName))
}
reg, ok := provider.Regions.GetByName(region)
if !ok {
return nil, errs.RegionNotSupported(fmt.Sprintf("'%s' region in '%s' cloud provider is unsupported", region, providerName))
}
return reg.getLimitSetForInstanceTypeInRegion(instanceType)
}
// Read the contents of file into the providers config
func readFileProvidersConfig(file string, val *ProviderConfiguration) error {
fileContents, err := shared.ReadFile(file)
if err != nil {
return err
}
return yaml.UnmarshalStrict([]byte(fileContents), val)
}
func (c ProviderList) GetDefault() (Provider, error) {
for _, p := range c {
if p.Default {
return p, nil
}
}
return Provider{}, errors.New("no default provider found in list of supported providers")
}
func (provider Provider) GetDefaultRegion() (Region, error) {
for _, r := range provider.Regions {
if r.Default {
return r, nil
}
}
return Region{}, fmt.Errorf("no default region found for provider %s", provider.Name)
}
func (provider Provider) IsRegionSupported(regionName string) bool {
_, ok := provider.Regions.GetByName(regionName)
return ok
}