Skip to content

Commit

Permalink
Removed default value from capacity.json, assigning default value for… (
Browse files Browse the repository at this point in the history
#760)

* Removed default value from capacity.json, assigning default value for each broker now.

* Refactored code based on comments

* Corrected linter error

* Added unit test requested in reviews and small changes

* Reworked user added config part

* Fixed code based on comments

* Fixes concerning code duplication

* Corrected minor errors, added comments

Co-authored-by: Balint Molnar <balintmolnar91@gmail.com>
  • Loading branch information
Kuvesz and baluchicken committed Feb 7, 2022
1 parent 68a6465 commit 4ee4330
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 120 deletions.
12 changes: 0 additions & 12 deletions controllers/tests/kafkacluster_controller_cruisecontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,6 @@ zookeeper.connect=/
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
},
{
"brokerId": "-1",
"capacity": {
"DISK": {
"/kafka-logs/kafka": "10737"
},
"CPU": "100",
"NW_IN": "125000",
"NW_OUT": "125000"
},
"doc": "Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB."
}
]
}`))
Expand Down
151 changes: 88 additions & 63 deletions pkg/resources/cruisecontrol/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package cruisecontrol
import (
"encoding/json"
"fmt"
"sort"
"strconv"

"emperror.dev/errors"

"github.com/go-logr/logr"
"gopkg.in/inf.v0"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

"github.com/banzaicloud/koperator/api/v1alpha1"
Expand Down Expand Up @@ -140,16 +142,38 @@ type JBODInvariantCapacityConfig struct {
func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, config *corev1.ConfigMap) (string, error) {
var err error

log.Info("Generating capacity config")
log.Info("generating capacity config")

var capacityConfig JBODInvariantCapacityConfig
var userConfigBrokerIds []string
// If there is already a config added manually, use that one
if kafkaCluster.Spec.CruiseControlConfig.CapacityConfig != "" {
userProvidedCapacityConfig := kafkaCluster.Spec.CruiseControlConfig.CapacityConfig
updatedProvidedCapacityConfig := ensureContainsDefaultBrokerCapacity([]byte(userProvidedCapacityConfig), log)
if updatedProvidedCapacityConfig != nil {
return string(updatedProvidedCapacityConfig), err
err := json.Unmarshal([]byte(userProvidedCapacityConfig), &capacityConfig)
if err != nil {
return "", errors.Wrap(err, "could not unmarshal the user-provided broker capacity config")
}
for _, brokerCapacity := range capacityConfig.Capacities {
brokerCapacityMap, ok := brokerCapacity.(map[string]interface{})
if !ok {
continue
}
brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, "brokerId")
if err != nil {
return "", errors.WrapIfWithDetails(err,
"could retrieve broker Id from broker capacity configuration",
"capacity configuration", brokerCapacityMap)
}
if !ok {
continue
}
// If the -1 default exists we don't have to do anything else here since all brokers will have values.
if brokerId == "-1" {
log.Info("Using user provided capacity config because it has universal default defined", "capacity config", userProvidedCapacityConfig)
return userProvidedCapacityConfig, nil
}
userConfigBrokerIds = append(userConfigBrokerIds, brokerId)
}
return userProvidedCapacityConfig, err
}
// During cluster downscale the CR does not contain data for brokers being downscaled which is
// required to generate the proper capacity json for CC so we are reusing the old one.
Expand All @@ -160,80 +184,81 @@ func GenerateCapacityConfig(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger,
}
}

capacityConfig := CapacityConfig{}

for _, broker := range kafkaCluster.Spec.Brokers {
brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log)
if err != nil {
return "", errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id)
}
brokerCapacity := BrokerCapacity{
BrokerID: fmt.Sprintf("%d", broker.Id),
Capacity: Capacity{
DISK: brokerDisks,
CPU: generateBrokerCPU(broker, kafkaCluster.Spec, log),
NWIN: generateBrokerNetworkIn(broker, kafkaCluster.Spec, log),
NWOUT: generateBrokerNetworkOut(broker, kafkaCluster.Spec, log),
},
Doc: defaultDoc,
}

log.Info("The following brokerCapacity was generated", "brokerCapacity", brokerCapacity)

capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, brokerCapacity)
// If there was no user provided config we shall generate all configuration or
// adding generated values to all Brokers not provided by the user.
brokerCapacities, err := appendGeneratedBrokerCapacities(kafkaCluster, log, userConfigBrokerIds)
if err != nil {
return "", err
}

// adding default broker capacity config
capacityConfig.BrokerCapacities = append(capacityConfig.BrokerCapacities, generateDefaultBrokerCapacity())

capacityConfig.Capacities = append(capacityConfig.Capacities, brokerCapacities...)
result, err := json.MarshalIndent(capacityConfig, "", " ")
if err != nil {
log.Error(err, "Could not marshal cruise control capacity config")
return "", errors.WrapIf(err, "could not marshal cruise control capacity config")
}
log.Info(fmt.Sprintf("Generated capacity config was successful with values: %s", result))

log.Info("broker capacity config generated successfully", "capacity config", string(result))
return string(result), err
}

func ensureContainsDefaultBrokerCapacity(data []byte, log logr.Logger) []byte {
config := JBODInvariantCapacityConfig{}
err := json.Unmarshal(data, &config)
if err != nil {
log.Info("could not unmarshal the user-provided broker capacity config")
return nil
func appendGeneratedBrokerCapacities(kafkaCluster *v1beta1.KafkaCluster, log logr.Logger, userConfigBrokerIds []string) ([]interface{}, error) {
var brokerCapacities []interface{}

brokerIdFromStatus := make([]string, 0, len(kafkaCluster.Status.BrokersState))
for brokerId := range kafkaCluster.Status.BrokersState {
brokerIdFromStatus = append(brokerIdFromStatus, brokerId)
}
for _, brokerConfig := range config.Capacities {
brokerConfigMap, ok := brokerConfig.(map[string]interface{})
if !ok {
continue
}
var brokerId interface{}
brokerId, ok = brokerConfigMap["brokerId"]
if !ok {
continue
}
if brokerId == "-1" {
return nil
}
// Since maps aren't ordered we need to order this list before using it
sort.Strings(brokerIdFromStatus)

for _, userConfigBrokerId := range userConfigBrokerIds {
brokerIdFromStatus = util.StringSliceRemove(brokerIdFromStatus, userConfigBrokerId)
}

// should add default broker capacity config
config.Capacities = append(config.Capacities, generateDefaultBrokerCapacity())
if len(brokerIdFromStatus) == 0 {
return nil, nil
}

result, err := json.MarshalIndent(config, "", " ")
if err != nil {
log.Info("could not marshal the modified broker capacity config")
return nil
for _, brokerId := range brokerIdFromStatus {
brokerCapacity := BrokerCapacity{}
brokerFoundInSpec := false
for _, broker := range kafkaCluster.Spec.Brokers {
if brokerId == strconv.Itoa(int(broker.Id)) {
brokerFoundInSpec = true
brokerDisks, err := generateBrokerDisks(broker, kafkaCluster.Spec, log)
if err != nil {
return nil, errors.WrapIfWithDetails(err, "could not generate broker disks config for broker", "brokerID", broker.Id)
}
brokerCapacity = BrokerCapacity{
BrokerID: strconv.Itoa(int(broker.Id)),
Capacity: Capacity{
DISK: brokerDisks,
CPU: generateBrokerCPU(broker, kafkaCluster.Spec, log),
NWIN: generateBrokerNetworkIn(broker, kafkaCluster.Spec, log),
NWOUT: generateBrokerNetworkOut(broker, kafkaCluster.Spec, log),
},
Doc: defaultDoc,
}
}
}
// When removing a broker it still needs to have values assigned in capacity config
// although it doesn't really matter what the values are, so we are setting defaults
// here, this way we don't have to deal with a universal default.
if !brokerFoundInSpec {
log.Info("broker spec not found, using default fallback")
brokerCapacity = generateDefaultBrokerCapacityWithId(brokerId)
}
log.V(1).Info("capacity config successfully generated for broker", "capacity config", brokerCapacity)

brokerCapacities = append(brokerCapacities, &brokerCapacity)
}
return result
return brokerCapacities, nil
}

// generates default broker capacity
// the exact values do not matter as for every broker it is set explicitly above
// upon broker deletion CruiseControl does not use these value, it requires to detect that a broker exists
func generateDefaultBrokerCapacity() BrokerCapacity {
// Generate default broker capacity
// This value is used by every broker not in the spec, for example when deleting a broker
func generateDefaultBrokerCapacityWithId(brokerId string) BrokerCapacity {
return BrokerCapacity{
BrokerID: "-1",
BrokerID: brokerId,
Capacity: Capacity{
DISK: map[string]string{
"/kafka-logs/kafka": "10737",
Expand Down
Loading

0 comments on commit 4ee4330

Please sign in to comment.