Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed default value from capacity.json, assigning default value for… #760

Merged
merged 10 commits into from
Feb 7, 2022
73 changes: 70 additions & 3 deletions pkg/resources/cruisecontrol/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,
// If there is already a config added manually, use that one
if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" {
userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig
updatedProvidedCapacityConfig, err := extendUserProvidedCapacityConfig([]byte(userProvidedCapacityConfig), kafkaCluster, log)
stoader marked this conversation as resolved.
Show resolved Hide resolved
if updatedProvidedCapacityConfig != "" {
return updatedProvidedCapacityConfig, err
}
return userProvidedCapacityConfig, err
}
// During cluster downscale the CR does not contain data for brokers being downscaled which is
Expand All @@ -157,6 +161,21 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,
}
}

capacityConfig := CapacityConfig{}
BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, nil)
if err != nil {
return "", err
}
capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, BrokerCapacities...)
result, err := json.MarshalIndent(capacityConfig, "", " ")
if err != nil {
log.Error(err, "Could not marshal cruise control capacity config")
stoader marked this conversation as resolved.
Show resolved Hide resolved
}
log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result))
return string(result), err
}

func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, presentBrokerIds []string) ([]BrokerCapacity, error) {
capacityConfig := CapacityConfig{}

brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState))
Expand All @@ -165,6 +184,20 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,
}
sort.Strings(brokerIdFromStatus)
stoader marked this conversation as resolved.
Show resolved Hide resolved

RemovePresentBrokerIds:
for _, presentBrokerId := range presentBrokerIds {
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
for i, statusBrokerId := range brokerIdFromStatus {
if statusBrokerId == presentBrokerId {
brokerIdFromStatus = append(brokerIdFromStatus[:i], brokerIdFromStatus[i+1:]...)
continue RemovePresentBrokerIds
}
}
}

if len(brokerIdFromStatus) == 0 {
return []BrokerCapacity{}, nil
}

for _, brokerId := range brokerIdFromStatus {
stoader marked this conversation as resolved.
Show resolved Hide resolved
brokerCapacity := BrokerCapacity{}
brokerFoundInSpec := false
Expand All @@ -173,7 +206,7 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,
brokerFoundInSpec = true
brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log)
if err != nil {
return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id)
return []BrokerCapacity{}, errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id)
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
}
brokerCapacity = BrokerCapacity{
BrokerID: strconv.Itoa(int(broker.Id)),
Expand All @@ -198,13 +231,47 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,

capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity)
}
return capacityConfig.BrokerCapacities, nil
}

result, err := json.MarshalIndent(capacityConfig, "", " ")
func extendUserProvidedCapacityConfig(data []byte, kafkaCluster *v1beta1.KafkaCluster, log logr.Logger) (string, error) {
stoader marked this conversation as resolved.
Show resolved Hide resolved
var presentBrokerIds []string
config := JBODInvariantCapacityConfig{}
err := json.Unmarshal(data, &config)
if err != nil {
log.Info("could not unmarshal the user-provided broker capacity config")
stoader marked this conversation as resolved.
Show resolved Hide resolved
return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config")
}
for _, brokerConfig := range config.Capacities {
brokerConfigMap, ok := brokerConfig.(map[string]interface{})
if !ok {
continue
}
var brokerId interface{}
stoader marked this conversation as resolved.
Show resolved Hide resolved
brokerId, ok = brokerConfigMap["brokerId"]
stoader marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
continue
} else {
stoader marked this conversation as resolved.
Show resolved Hide resolved
presentBrokerIds = append(presentBrokerIds, fmt.Sprint(brokerId))
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
}
if brokerId == "-1" {
return "", nil
}
}

//Addig generated values to all Brokers not already present in the list
BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, presentBrokerIds)
stoader marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", err
}
for _, brokerCapacity := range BrokerCapacities {
config.Capacities = append(config.Capacities, brokerCapacity)
}
result, err := json.MarshalIndent(config, "", " ")
if err != nil {
log.Error(err, "Could not marshal cruise control capacity config")
}
log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result))

return string(result), err
}

Expand Down
67 changes: 66 additions & 1 deletion pkg/resources/cruisecontrol/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ func TestReturnErrorStorageConfigLessThan1MB(t *testing.T) {
//nolint:funlen
func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) {
cpuQuantity, _ := resource.ParseQuantity("2000m")

testCases := []struct {
testName string
capacityConfig string
Expand Down Expand Up @@ -681,6 +680,26 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) {
"NW_OUT": "50000"
},
"doc": "This overrides the capacity for broker 1. This broker is a JBOD broker."
},
{
"brokerId": "2",
"capacity": {
"DISK": {},
"CPU": "200",
"NW_IN": "125000",
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
},
{
"brokerId": "4",
"capacity": {
"DISK": {},
"CPU": "200",
"NW_IN": "125000",
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
}
]
}`,
Expand Down Expand Up @@ -809,6 +828,26 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) {
"NW_OUT": "50000"
},
"doc": "This overrides the capacity for broker 1. This broker is a JBOD broker."
},
{
"brokerId": "2",
"capacity": {
"DISK": {},
"CPU": "200",
"NW_IN": "125000",
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
},
{
"brokerId": "4",
"capacity": {
"DISK": {},
"CPU": "200",
"NW_IN": "125000",
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
}
]
}`,
Expand Down Expand Up @@ -908,18 +947,44 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) {
},
{
Id: 1,
BrokerConfig: &v1beta1.BrokerConfig{
Resources: &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": cpuQuantity,
}},
},
},
{
Id: 2,
BrokerConfig: &v1beta1.BrokerConfig{
Resources: &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": cpuQuantity,
}},
},
},
{
Id: 4,
BrokerConfig: &v1beta1.BrokerConfig{
Resources: &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": cpuQuantity,
}},
},
},
},
CruiseControlConfig: v1beta1.CruiseControlConfig{
CapacityConfig: test.capacityConfig,
},
},
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {},
"1": {},
"2": {},
"4": {},
},
},
}
var actual JBODInvariantCapacityConfig
rawStringActual, _ := GenerateCapacityConfig(&kafkaCluster, logr.Discard(), nil)
Expand Down