From ace40575a374d6fe6eb8b0359a1cd5e170cb770b Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Tue, 1 Feb 2022 13:56:04 +0100 Subject: [PATCH 1/8] Removed default value from capacity.json, assigning default value for each broker now. --- ...kacluster_controller_cruisecontrol_test.go | 12 -- pkg/resources/cruisecontrol/configmap.go | 103 +++++++----------- pkg/resources/cruisecontrol/configmap_test.go | 82 +++++--------- 3 files changed, 68 insertions(+), 129 deletions(-) diff --git a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go index 5711904a1..c1b4030e9 100644 --- a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go +++ b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go @@ -151,18 +151,6 @@ zookeeper.connect=/ "NW_OUT": "125000" }, "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": { - "/kafka-logs/kafka": "10737" - }, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`)) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 4a83b75d7..92b38ea5f 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -17,6 +17,7 @@ package cruisecontrol import ( "encoding/json" "fmt" + "sort" "strconv" "emperror.dev/errors" @@ -145,10 +146,6 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, // If there is already a config added manually, use that one if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" { userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig - updatedProvidedCapacityConfig := ensureContainsDefaultBrokerCapacity([]byte(userProvidedCapacityConfig), log) - if updatedProvidedCapacityConfig != nil { - return string(updatedProvidedCapacityConfig), err - } return userProvidedCapacityConfig, err } // During cluster downscale the CR does not contain data for brokers being downscaled which is @@ -162,30 +159,50 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, capacityConfig := CapacityConfig{} - for _, broker := range kafkaCluster.Spec.Brokers { - brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) - if err != nil { - return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) - } - brokerCapacity := BrokerCapacity{ - BrokerID: fmt.Sprintf("%d", broker.Id), - Capacity: Capacity{ - DISK: brokerDisks, - CPU: generateBrokerCPU(broker, kafkaCluster.Spec, log), - NWIN: generateBrokerNetworkIn(broker, kafkaCluster.Spec, log), - NWOUT: generateBrokerNetworkOut(broker, kafkaCluster.Spec, log), - }, - Doc: defaultDoc, + var strkeys []string + for key := range kafkaCluster.Status.BrokersState { + strkeys = append(strkeys, key) + } + sort.Strings(strkeys) + + for _, brokerId := range strkeys { + brokerFound := false + for _, broker := range kafkaCluster.Spec.Brokers { + if brokerId == fmt.Sprintf("%d", broker.Id) { + brokerFound = true + brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) + if err != nil { + return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) + } + brokerCapacity := BrokerCapacity{ + BrokerID: fmt.Sprintf("%d", broker.Id), + Capacity: Capacity{ + DISK: brokerDisks, + CPU: generateBrokerCPU(broker, kafkaCluster.Spec, log), + NWIN: generateBrokerNetworkIn(broker, kafkaCluster.Spec, log), + NWOUT: generateBrokerNetworkOut(broker, kafkaCluster.Spec, log), + }, + Doc: defaultDoc, + } + + log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) + + capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) + } } + // When removing a broker it still needs to have values assigned in capacity config + // although it doesn't really matter what the values are, so we are setting defaults + // here, this way we don't have to deal with a universal default. + if !brokerFound { + log.Info("Broker spec not found, using default fallback") + brokerCapacity := generateDefaultBrokerCapacityWithId(brokerId) - log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) + log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) + capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) + } } - // adding default broker capacity config - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, generateDefaultBrokerCapacity()) - result, err := json.MarshalIndent(capacityConfig, "", " ") if err != nil { log.Error(err, "Could not marshal cruise control capacity config") @@ -195,45 +212,9 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, return string(result), err } -func ensureContainsDefaultBrokerCapacity(data []byte, log logr.Logger) []byte { - config := JBODInvariantCapacityConfig{} - err := json.Unmarshal(data, &config) - if err != nil { - log.Info("could not unmarshal the user-provided broker capacity config") - return nil - } - for _, brokerConfig := range config.Capacities { - brokerConfigMap, ok := brokerConfig.(map[string]interface{}) - if !ok { - continue - } - var brokerId interface{} - brokerId, ok = brokerConfigMap["brokerId"] - if !ok { - continue - } - if brokerId == "-1" { - return nil - } - } - - // should add default broker capacity config - config.Capacities = append(config.Capacities, generateDefaultBrokerCapacity()) - - result, err := json.MarshalIndent(config, "", " ") - if err != nil { - log.Info("could not marshal the modified broker capacity config") - return nil - } - return result -} - -// generates default broker capacity -// the exact values do not matter as for every broker it is set explicitly above -// upon broker deletion CruiseControl does not use these value, it requires to detect that a broker exists -func generateDefaultBrokerCapacity() BrokerCapacity { +func generateDefaultBrokerCapacityWithId(brokerId string) BrokerCapacity { return BrokerCapacity{ - BrokerID: "-1", + BrokerID: brokerId, Capacity: Capacity{ DISK: map[string]string{ "/kafka-logs/kafka": "10737", diff --git a/pkg/resources/cruisecontrol/configmap_test.go b/pkg/resources/cruisecontrol/configmap_test.go index 10425d5e3..e8bf39316 100644 --- a/pkg/resources/cruisecontrol/configmap_test.go +++ b/pkg/resources/cruisecontrol/configmap_test.go @@ -197,6 +197,14 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { }, }, }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + "1": {}, + "2": {}, + "3": {}, + }, + }, }, expectedConfiguration: ` { @@ -249,18 +257,6 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { "NW_OUT": "125000" }, "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": { - "/kafka-logs/kafka": "10737" - }, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -292,6 +288,11 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { }, }, }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + }, + }, }, expectedConfiguration: ` { @@ -307,18 +308,6 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { "NW_OUT": "125000" }, "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": { - "/kafka-logs/kafka": "10737" - }, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -379,6 +368,14 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { }, }, }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + "1": {}, + "2": {}, + "3": {}, + }, + }, }, expectedConfiguration: `{ "brokerCapacities": [ @@ -429,18 +426,6 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { "NW_OUT": "200" }, "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": { - "/kafka-logs/kafka": "10737" - }, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -503,6 +488,11 @@ func TestReturnErrorStorageConfigLessThan1MB(t *testing.T) { }, }, }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + }, + }, } _, err := GenerateCapacityConfig(&kafkaCluster, logr.Discard(), nil) @@ -570,16 +560,6 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { "NW_OUT": "50000" }, "doc": "This overrides the capacity for broker 1. This broker is a JBOD broker." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": {"/kafka-logs/kafka": "10737"}, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -708,16 +688,6 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { "NW_OUT": "50000" }, "doc": "This overrides the capacity for broker 1. This broker is a JBOD broker." - }, - { - "brokerId": "-1", - "capacity": { - "DISK": {"/kafka-logs/kafka": "10737"}, - "CPU": "100", - "NW_IN": "125000", - "NW_OUT": "125000" - }, - "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, From dd76980ff76ea2c90771789e7d21c0e6178b3514 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Wed, 2 Feb 2022 11:34:46 +0100 Subject: [PATCH 2/8] Refactored code based on comments --- pkg/resources/cruisecontrol/configmap.go | 28 +++++++++++------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 92b38ea5f..3282f31ff 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -159,23 +159,25 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, capacityConfig := CapacityConfig{} - var strkeys []string + brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState)) for key := range kafkaCluster.Status.BrokersState { - strkeys = append(strkeys, key) + brokerIdFromStatus = append(brokerIdFromStatus, key) } - sort.Strings(strkeys) + sort.Strings(brokerIdFromStatus) + log.Info("String keys", "brokerIdFromStatus", brokerIdFromStatus) - for _, brokerId := range strkeys { + for _, brokerId := range brokerIdFromStatus { + brokerCapacity := BrokerCapacity{} brokerFound := false for _, broker := range kafkaCluster.Spec.Brokers { - if brokerId == fmt.Sprintf("%d", broker.Id) { + if brokerId == strconv.Itoa(int(broker.Id)) { brokerFound = true brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) if err != nil { return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) } - brokerCapacity := BrokerCapacity{ - BrokerID: fmt.Sprintf("%d", broker.Id), + brokerCapacity = BrokerCapacity{ + BrokerID: strconv.Itoa(int(broker.Id)), Capacity: Capacity{ DISK: brokerDisks, CPU: generateBrokerCPU(broker, kafkaCluster.Spec, log), @@ -184,10 +186,6 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, }, Doc: defaultDoc, } - - log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) - - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) } } // When removing a broker it still needs to have values assigned in capacity config @@ -195,12 +193,12 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, // here, this way we don't have to deal with a universal default. if !brokerFound { log.Info("Broker spec not found, using default fallback") - brokerCapacity := generateDefaultBrokerCapacityWithId(brokerId) + brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId) - log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) - - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) } + log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) + + capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) } result, err := json.MarshalIndent(capacityConfig, "", " ") From 2f55eaac1255d93850691b96151d3ffce783e7a8 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Wed, 2 Feb 2022 11:44:40 +0100 Subject: [PATCH 3/8] Corrected linter error --- pkg/resources/cruisecontrol/configmap.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 3282f31ff..cc27add91 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -194,7 +194,6 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, if !brokerFound { log.Info("Broker spec not found, using default fallback") brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId) - } log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) From 616b8bfa0739c9c4c77a133b5487f9b8e01b9ce0 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Wed, 2 Feb 2022 13:40:16 +0100 Subject: [PATCH 4/8] Added unit test requested in reviews and small changes --- pkg/resources/cruisecontrol/configmap.go | 11 +- pkg/resources/cruisecontrol/configmap_test.go | 121 ++++++++++++++++++ 2 files changed, 126 insertions(+), 6 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index cc27add91..fcef7f867 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -160,18 +160,17 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, capacityConfig := CapacityConfig{} brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState)) - for key := range kafkaCluster.Status.BrokersState { - brokerIdFromStatus = append(brokerIdFromStatus, key) + for brokerId := range kafkaCluster.Status.BrokersState { + brokerIdFromStatus = append(brokerIdFromStatus, brokerId) } sort.Strings(brokerIdFromStatus) - log.Info("String keys", "brokerIdFromStatus", brokerIdFromStatus) for _, brokerId := range brokerIdFromStatus { brokerCapacity := BrokerCapacity{} - brokerFound := false + brokerFoundInSpec := false for _, broker := range kafkaCluster.Spec.Brokers { if brokerId == strconv.Itoa(int(broker.Id)) { - brokerFound = true + brokerFoundInSpec = true brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) if err != nil { return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) @@ -191,7 +190,7 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, // When removing a broker it still needs to have values assigned in capacity config // although it doesn't really matter what the values are, so we are setting defaults // here, this way we don't have to deal with a universal default. - if !brokerFound { + if !brokerFoundInSpec { log.Info("Broker spec not found, using default fallback") brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId) } diff --git a/pkg/resources/cruisecontrol/configmap_test.go b/pkg/resources/cruisecontrol/configmap_test.go index e8bf39316..351681fce 100644 --- a/pkg/resources/cruisecontrol/configmap_test.go +++ b/pkg/resources/cruisecontrol/configmap_test.go @@ -261,6 +261,127 @@ func TestGenerateCapacityConfig_JBOD(t *testing.T) { ] }`, }, + { + testName: "generate correct capacity config when there is a broker missing from spec but present in status", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ + "default": { + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/path-from-default", + PvcSpec: &v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: quantity, + }, + }, + }, + }, + }, + }, + }, + Brokers: []v1beta1.Broker{ + { + Id: 0, + BrokerConfigGroup: "default", + BrokerConfig: &v1beta1.BrokerConfig{ + Resources: &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": cpuQuantity, + }}, + }, + }, + { + Id: 1, + BrokerConfigGroup: "default", + }, + { + Id: 2, + BrokerConfigGroup: "default", + }, + { + Id: 3, + BrokerConfigGroup: "default", + }, + }, + }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + "1": {}, + "2": {}, + "3": {}, + "4": {}, + }, + }, + }, + expectedConfiguration: ` + { + "brokerCapacities": [ + { + "brokerId": "0", + "capacity": { + "DISK": { + "/path-from-default/kafka": "10737" + }, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "1", + "capacity": { + "DISK": { + "/path-from-default/kafka": "10737" + }, + "CPU": "150", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "2", + "capacity": { + "DISK": { + "/path-from-default/kafka": "10737" + }, + "CPU": "150", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "3", + "capacity": { + "DISK": { + "/path-from-default/kafka": "10737" + }, + "CPU": "150", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "4", + "capacity": { + "DISK": { + "/kafka-logs/kafka": "10737" + }, + "CPU": "100", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + } + ] + }`, + }, { testName: "generate correct capacity config when storage config is specified as 1Mi ", kafkaCluster: v1beta1.KafkaCluster{ From 2e6534ccb5a27ebbbf303f9e8894efa254e225a3 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Thu, 3 Feb 2022 18:38:52 +0100 Subject: [PATCH 5/8] Reworked user added config part --- pkg/resources/cruisecontrol/configmap.go | 73 ++++++++++++++++++- pkg/resources/cruisecontrol/configmap_test.go | 67 ++++++++++++++++- 2 files changed, 136 insertions(+), 4 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index fcef7f867..6228378d2 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -146,6 +146,10 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, // If there is already a config added manually, use that one if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" { userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig + updatedProvidedCapacityConfig, err := extendUserProvidedCapacityConfig([]byte(userProvidedCapacityConfig), kafkaCluster, log) + if updatedProvidedCapacityConfig != "" { + return updatedProvidedCapacityConfig, err + } return userProvidedCapacityConfig, err } // During cluster downscale the CR does not contain data for brokers being downscaled which is @@ -157,6 +161,21 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } } + capacityConfig := CapacityConfig{} + BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, nil) + if err != nil { + return "", err + } + capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, BrokerCapacities...) + result, err := json.MarshalIndent(capacityConfig, "", " ") + if err != nil { + log.Error(err, "Could not marshal cruise control capacity config") + } + log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result)) + return string(result), err +} + +func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, presentBrokerIds []string) ([]BrokerCapacity, error) { capacityConfig := CapacityConfig{} brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState)) @@ -165,6 +184,20 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } sort.Strings(brokerIdFromStatus) +RemovePresentBrokerIds: + for _, presentBrokerId := range presentBrokerIds { + for i, statusBrokerId := range brokerIdFromStatus { + if statusBrokerId == presentBrokerId { + brokerIdFromStatus = append(brokerIdFromStatus[:i], brokerIdFromStatus[i+1:]...) + continue RemovePresentBrokerIds + } + } + } + + if len(brokerIdFromStatus) == 0 { + return []BrokerCapacity{}, nil + } + for _, brokerId := range brokerIdFromStatus { brokerCapacity := BrokerCapacity{} brokerFoundInSpec := false @@ -173,7 +206,7 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, brokerFoundInSpec = true brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) if err != nil { - return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) + return []BrokerCapacity{}, errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) } brokerCapacity = BrokerCapacity{ BrokerID: strconv.Itoa(int(broker.Id)), @@ -198,13 +231,47 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) } + return capacityConfig.BrokerCapacities, nil +} - result, err := json.MarshalIndent(capacityConfig, "", " ") +func extendUserProvidedCapacityConfig(data []byte, kafkaCluster *v1beta1.KafkaCluster, log logr.Logger) (string, error) { + var presentBrokerIds []string + config := JBODInvariantCapacityConfig{} + err := json.Unmarshal(data, &config) + if err != nil { + log.Info("could not unmarshal the user-provided broker capacity config") + return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config") + } + for _, brokerConfig := range config.Capacities { + brokerConfigMap, ok := brokerConfig.(map[string]interface{}) + if !ok { + continue + } + var brokerId interface{} + brokerId, ok = brokerConfigMap["brokerId"] + if !ok { + continue + } else { + presentBrokerIds = append(presentBrokerIds, fmt.Sprint(brokerId)) + } + if brokerId == "-1" { + return "", nil + } + } + + //Addig generated values to all Brokers not already present in the list + BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, presentBrokerIds) + if err != nil { + return "", err + } + for _, brokerCapacity := range BrokerCapacities { + config.Capacities = append(config.Capacities, brokerCapacity) + } + result, err := json.MarshalIndent(config, "", " ") if err != nil { log.Error(err, "Could not marshal cruise control capacity config") } log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result)) - return string(result), err } diff --git a/pkg/resources/cruisecontrol/configmap_test.go b/pkg/resources/cruisecontrol/configmap_test.go index 351681fce..b2d8cc8a3 100644 --- a/pkg/resources/cruisecontrol/configmap_test.go +++ b/pkg/resources/cruisecontrol/configmap_test.go @@ -626,7 +626,6 @@ func TestReturnErrorStorageConfigLessThan1MB(t *testing.T) { //nolint:funlen func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { cpuQuantity, _ := resource.ParseQuantity("2000m") - testCases := []struct { testName string capacityConfig string @@ -681,6 +680,26 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { "NW_OUT": "50000" }, "doc": "This overrides the capacity for broker 1. This broker is a JBOD broker." + }, + { + "brokerId": "2", + "capacity": { + "DISK": {}, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "4", + "capacity": { + "DISK": {}, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -809,6 +828,26 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { "NW_OUT": "50000" }, "doc": "This overrides the capacity for broker 1. This broker is a JBOD broker." + }, + { + "brokerId": "2", + "capacity": { + "DISK": {}, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + }, + { + "brokerId": "4", + "capacity": { + "DISK": {}, + "CPU": "200", + "NW_IN": "125000", + "NW_OUT": "125000" + }, + "doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." } ] }`, @@ -908,18 +947,44 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { }, { Id: 1, + BrokerConfig: &v1beta1.BrokerConfig{ + Resources: &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": cpuQuantity, + }}, + }, }, { Id: 2, + BrokerConfig: &v1beta1.BrokerConfig{ + Resources: &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": cpuQuantity, + }}, + }, }, { Id: 4, + BrokerConfig: &v1beta1.BrokerConfig{ + Resources: &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": cpuQuantity, + }}, + }, }, }, CruiseControlConfig: v1beta1.CruiseControlConfig{ CapacityConfig: test.capacityConfig, }, }, + Status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {}, + "1": {}, + "2": {}, + "4": {}, + }, + }, } var actual JBODInvariantCapacityConfig rawStringActual, _ := GenerateCapacityConfig(&kafkaCluster, logr.Discard(), nil) From 7d548b4f062958ced17c65be42b9b7e4ec2572d3 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Fri, 4 Feb 2022 12:05:23 +0100 Subject: [PATCH 6/8] Fixed code based on comments --- pkg/resources/cruisecontrol/configmap.go | 113 ++++++++++------------- 1 file changed, 51 insertions(+), 62 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 6228378d2..e766bf3b5 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -141,16 +141,51 @@ type JBODInvariantCapacityConfig struct { func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, config *corev1.ConfigMap) (string, error) { var err error - log.Info("Generating capacity config") + log.Info("generating capacity config") + var capacityConfig CapacityConfig + var brokerCapacities []BrokerCapacity // If there is already a config added manually, use that one if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" { userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig - updatedProvidedCapacityConfig, err := extendUserProvidedCapacityConfig([]byte(userProvidedCapacityConfig), kafkaCluster, log) - if updatedProvidedCapacityConfig != "" { - return updatedProvidedCapacityConfig, err + var presentBrokerIds []string + config := JBODInvariantCapacityConfig{} + err := json.Unmarshal([]byte(userProvidedCapacityConfig), &config) + if err != nil { + return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config") + } + for _, brokerConfig := range config.Capacities { + brokerConfigMap, ok := brokerConfig.(map[string]interface{}) + if !ok { + continue + } + brokerId, ok := brokerConfigMap["brokerId"].(string) + if !ok { + continue + } + presentBrokerIds = append(presentBrokerIds, brokerId) + //If the -1 default exists we don't have to do anything else here since all brokers will have values. + if brokerId == "-1" { + return userProvidedCapacityConfig, nil + } + } + + //Addig generated values to all Brokers not provided by the user. + brokerCapacities, err = appendGeneratedBrokerCapacities(kafkaCluster, log, presentBrokerIds) + if err != nil { + return "", err + } + if brokerCapacities != nil { + for _, brokerCapacity := range brokerCapacities { + config.Capacities = append(config.Capacities, brokerCapacity) + } + result, err := json.MarshalIndent(config, "", " ") + if err != nil { + return "", err + } + log.Info(fmt.Sprintf("generated capacity config was successful with values: %s", result)) + return string(result), err } - return userProvidedCapacityConfig, err } // During cluster downscale the CR does not contain data for brokers being downscaled which is // required to generate the proper capacity json for CC so we are reusing the old one. @@ -161,17 +196,18 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } } - capacityConfig := CapacityConfig{} - BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, nil) + //If there was no user provided config we shall generate all configuration + brokerCapacities, err = appendGeneratedBrokerCapacities(kafkaCluster, log, nil) if err != nil { return "", err } - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, BrokerCapacities...) + + capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacities...) result, err := json.MarshalIndent(capacityConfig, "", " ") if err != nil { - log.Error(err, "Could not marshal cruise control capacity config") + return "", err } - log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result)) + log.Info(fmt.Sprintf("generated capacity config was successful with values: %s", result)) return string(result), err } @@ -184,18 +220,12 @@ func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log log } sort.Strings(brokerIdFromStatus) -RemovePresentBrokerIds: for _, presentBrokerId := range presentBrokerIds { - for i, statusBrokerId := range brokerIdFromStatus { - if statusBrokerId == presentBrokerId { - brokerIdFromStatus = append(brokerIdFromStatus[:i], brokerIdFromStatus[i+1:]...) - continue RemovePresentBrokerIds - } - } + brokerIdFromStatus = util.StringSliceRemove(brokerIdFromStatus, presentBrokerId) } if len(brokerIdFromStatus) == 0 { - return []BrokerCapacity{}, nil + return nil, nil } for _, brokerId := range brokerIdFromStatus { @@ -206,7 +236,7 @@ RemovePresentBrokerIds: brokerFoundInSpec = true brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log) if err != nil { - return []BrokerCapacity{}, errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) + return nil, errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id) } brokerCapacity = BrokerCapacity{ BrokerID: strconv.Itoa(int(broker.Id)), @@ -224,57 +254,16 @@ RemovePresentBrokerIds: // although it doesn't really matter what the values are, so we are setting defaults // here, this way we don't have to deal with a universal default. if !brokerFoundInSpec { - log.Info("Broker spec not found, using default fallback") + log.Info("broker spec not found, using default fallback") brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId) } - log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity) + log.Info("the following brokerCapacity was generated", "brokerCapacity", brokerCapacity) capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) } return capacityConfig.BrokerCapacities, nil } -func extendUserProvidedCapacityConfig(data []byte, kafkaCluster *v1beta1.KafkaCluster, log logr.Logger) (string, error) { - var presentBrokerIds []string - config := JBODInvariantCapacityConfig{} - err := json.Unmarshal(data, &config) - if err != nil { - log.Info("could not unmarshal the user-provided broker capacity config") - return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config") - } - for _, brokerConfig := range config.Capacities { - brokerConfigMap, ok := brokerConfig.(map[string]interface{}) - if !ok { - continue - } - var brokerId interface{} - brokerId, ok = brokerConfigMap["brokerId"] - if !ok { - continue - } else { - presentBrokerIds = append(presentBrokerIds, fmt.Sprint(brokerId)) - } - if brokerId == "-1" { - return "", nil - } - } - - //Addig generated values to all Brokers not already present in the list - BrokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, presentBrokerIds) - if err != nil { - return "", err - } - for _, brokerCapacity := range BrokerCapacities { - config.Capacities = append(config.Capacities, brokerCapacity) - } - result, err := json.MarshalIndent(config, "", " ") - if err != nil { - log.Error(err, "Could not marshal cruise control capacity config") - } - log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result)) - return string(result), err -} - func generateDefaultBrokerCapacityWithId(brokerId string) BrokerCapacity { return BrokerCapacity{ BrokerID: brokerId, From f86b203f6bf743400f581f9aa080cbaae85931c3 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Fri, 4 Feb 2022 21:57:23 +0100 Subject: [PATCH 7/8] Fixes concerning code duplication --- pkg/resources/cruisecontrol/configmap.go | 60 ++++++++----------- pkg/resources/cruisecontrol/configmap_test.go | 16 ++++- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index e766bf3b5..e7411cfbd 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -25,6 +25,7 @@ import ( "github.com/go-logr/logr" "gopkg.in/inf.v0" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "github.com/banzaicloud/koperator/api/v1alpha1" @@ -143,48 +144,34 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, log.Info("generating capacity config") - var capacityConfig CapacityConfig - var brokerCapacities []BrokerCapacity + var capacityConfig JBODInvariantCapacityConfig + var userConfigBrokerIds []string // If there is already a config added manually, use that one if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" { userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig - var presentBrokerIds []string - config := JBODInvariantCapacityConfig{} - err := json.Unmarshal([]byte(userProvidedCapacityConfig), &config) + err := json.Unmarshal([]byte(userProvidedCapacityConfig), &capacityConfig) if err != nil { return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config") } - for _, brokerConfig := range config.Capacities { - brokerConfigMap, ok := brokerConfig.(map[string]interface{}) + for _, brokerCapacity := range capacityConfig.Capacities { + brokerCapacityMap, ok := brokerCapacity.(map[string]interface{}) if !ok { continue } - brokerId, ok := brokerConfigMap["brokerId"].(string) + brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, "brokerId") + if err != nil { + return "", errors.WrapIfWithDetails(err, + "could retrieve broker Id from broker capacity configuration", + "capacity configuration", brokerCapacityMap) + } if !ok { continue } - presentBrokerIds = append(presentBrokerIds, brokerId) - //If the -1 default exists we don't have to do anything else here since all brokers will have values. + // If the -1 default exists we don't have to do anything else here since all brokers will have values. if brokerId == "-1" { return userProvidedCapacityConfig, nil } - } - - //Addig generated values to all Brokers not provided by the user. - brokerCapacities, err = appendGeneratedBrokerCapacities(kafkaCluster, log, presentBrokerIds) - if err != nil { - return "", err - } - if brokerCapacities != nil { - for _, brokerCapacity := range brokerCapacities { - config.Capacities = append(config.Capacities, brokerCapacity) - } - result, err := json.MarshalIndent(config, "", " ") - if err != nil { - return "", err - } - log.Info(fmt.Sprintf("generated capacity config was successful with values: %s", result)) - return string(result), err + userConfigBrokerIds = append(userConfigBrokerIds, brokerId) } } // During cluster downscale the CR does not contain data for brokers being downscaled which is @@ -196,13 +183,14 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } } - //If there was no user provided config we shall generate all configuration - brokerCapacities, err = appendGeneratedBrokerCapacities(kafkaCluster, log, nil) + // If there was no user provided config we shall generate all configuration or + // addig generated values to all Brokers not provided by the user. + brokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, userConfigBrokerIds) if err != nil { return "", err } - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacities...) + capacityConfig.Capacities = append(capacityConfig.Capacities, brokerCapacities...) result, err := json.MarshalIndent(capacityConfig, "", " ") if err != nil { return "", err @@ -211,8 +199,8 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, return string(result), err } -func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, presentBrokerIds []string) ([]BrokerCapacity, error) { - capacityConfig := CapacityConfig{} +func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, userConfigBrokerIds []string) ([]interface{}, error) { + var brokerCapacities []interface{} brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState)) for brokerId := range kafkaCluster.Status.BrokersState { @@ -220,8 +208,8 @@ func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log log } sort.Strings(brokerIdFromStatus) - for _, presentBrokerId := range presentBrokerIds { - brokerIdFromStatus = util.StringSliceRemove(brokerIdFromStatus, presentBrokerId) + for _, userConfigBrokerId := range userConfigBrokerIds { + brokerIdFromStatus = util.StringSliceRemove(brokerIdFromStatus, userConfigBrokerId) } if len(brokerIdFromStatus) == 0 { @@ -259,9 +247,9 @@ func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log log } log.Info("the following brokerCapacity was generated", "brokerCapacity", brokerCapacity) - capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity) + brokerCapacities = append(brokerCapacities, &brokerCapacity) } - return capacityConfig.BrokerCapacities, nil + return brokerCapacities, nil } func generateDefaultBrokerCapacityWithId(brokerId string) BrokerCapacity { diff --git a/pkg/resources/cruisecontrol/configmap_test.go b/pkg/resources/cruisecontrol/configmap_test.go index b2d8cc8a3..9759042df 100644 --- a/pkg/resources/cruisecontrol/configmap_test.go +++ b/pkg/resources/cruisecontrol/configmap_test.go @@ -694,7 +694,7 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { { "brokerId": "4", "capacity": { - "DISK": {}, + "DISK": {"/path1/kafka": "100"}, "CPU": "200", "NW_IN": "125000", "NW_OUT": "125000" @@ -842,7 +842,7 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { { "brokerId": "4", "capacity": { - "DISK": {}, + "DISK": {"/path1/kafka": "100"}, "CPU": "200", "NW_IN": "125000", "NW_OUT": "125000" @@ -970,6 +970,18 @@ func TestGenerateCapacityConfigWithUserProvidedInput(t *testing.T) { Limits: v1.ResourceList{ "cpu": cpuQuantity, }}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/path1", + PvcSpec: &v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("100M"), + }, + }, + }, + }, + }, }, }, }, From 7cf258f7d6c6a8c62bcb094f203439d7c959cbfa Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Mon, 7 Feb 2022 11:08:13 +0100 Subject: [PATCH 8/8] Corrected minor errors, added comments --- pkg/resources/cruisecontrol/configmap.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index e7411cfbd..2c1fcde71 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -169,6 +169,7 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } // If the -1 default exists we don't have to do anything else here since all brokers will have values. if brokerId == "-1" { + log.Info("Using user provided capacity config because it has universal default defined", "capacity config", userProvidedCapacityConfig) return userProvidedCapacityConfig, nil } userConfigBrokerIds = append(userConfigBrokerIds, brokerId) @@ -184,7 +185,7 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, } // If there was no user provided config we shall generate all configuration or - // addig generated values to all Brokers not provided by the user. + // adding generated values to all Brokers not provided by the user. brokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, userConfigBrokerIds) if err != nil { return "", err @@ -193,9 +194,9 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, capacityConfig.Capacities = append(capacityConfig.Capacities, brokerCapacities...) result, err := json.MarshalIndent(capacityConfig, "", " ") if err != nil { - return "", err + return "", errors.WrapIf(err, "could not marshal cruise control capacity config") } - log.Info(fmt.Sprintf("generated capacity config was successful with values: %s", result)) + log.Info("broker capacity config generated successfully", "capacity config", string(result)) return string(result), err } @@ -206,6 +207,7 @@ func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log log for brokerId := range kafkaCluster.Status.BrokersState { brokerIdFromStatus = append(brokerIdFromStatus, brokerId) } + // Since maps aren't ordered we need to order this list before using it sort.Strings(brokerIdFromStatus) for _, userConfigBrokerId := range userConfigBrokerIds { @@ -245,13 +247,15 @@ func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log log log.Info("broker spec not found, using default fallback") brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId) } - log.Info("the following brokerCapacity was generated", "brokerCapacity", brokerCapacity) + log.V(1).Info("capacity config successfully generated for broker", "capacity config", brokerCapacity) brokerCapacities = append(brokerCapacities, &brokerCapacity) } return brokerCapacities, nil } +// Generate default broker capacity +// This value is used by every broker not in the spec, for example when deleting a broker func generateDefaultBrokerCapacityWithId(brokerId string) BrokerCapacity { return BrokerCapacity{ BrokerID: brokerId,