Skip to content

Commit

Permalink
Support kafka cluster rebalance between broker disks when JBOD config…
Browse files Browse the repository at this point in the history
… is used (banzaicloud#894)
  • Loading branch information
bartam1 committed Nov 22, 2022
1 parent 76ec4ce commit c51f8a7
Show file tree
Hide file tree
Showing 4 changed files with 677 additions and 26 deletions.
155 changes: 132 additions & 23 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,13 +38,17 @@ import (

apiutil "github.com/banzaicloud/koperator/api/util"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
koperatorccconf "github.com/banzaicloud/koperator/pkg/resources/cruisecontrol"
"github.com/banzaicloud/koperator/pkg/scale"
"github.com/banzaicloud/koperator/pkg/util"
)

const (
DefaultRequeueAfterTimeInSec = 20
BrokerCapacityDisk = "DISK"
BrokerCapacity = "capacity"
)

// CruiseControlTaskReconciler reconciles a kafka cluster object
Expand Down Expand Up @@ -165,27 +171,17 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
removeTask.SetStateScheduled()

case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRebalance) > 0:
logDirsByBroker, err := scaler.LogDirsByBroker()
if err != nil {
log.Error(err, "failed to get list of volumes per broker from Cruise Control")
return requeueAfter(DefaultRequeueAfterTimeInSec)
}
brokerIDs := make([]string, 0)
unavailableBrokerIDs := make([]string, 0)
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
brokerIDs = append(brokerIDs, task.BrokerID)
found := false
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok {
for _, dir := range onlineDirs {
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) {
found = true
}
}
if !found {
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID)
}
}
}

unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(scaler, tasksAndStates)
if err != nil {
log.Error(err, "failed to get unavailable brokers at rebalance")
return requeueAfter(DefaultRequeueAfterTimeInSec)
}

if len(unavailableBrokerIDs) > 0 {
log.Info("requeue as there are offline broker log dirs for rebalance", "brokerIDs", unavailableBrokerIDs)
// This requeue is not necessary because the cruisecontrloperation controller retries the errored task
Expand All @@ -194,15 +190,38 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
return requeueAfter(DefaultRequeueAfterTimeInSec)
}

cruiseControlOpRef, err := r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs)
allBrokerIDs := make([]string, 0, len(instance.Spec.Brokers))
for i := range instance.Spec.Brokers {
allBrokerIDs = append(allBrokerIDs, fmt.Sprint(instance.Spec.Brokers[i].Id))
}
// we can do rebalance between the broker's disks when JBOD capacity config is used
// this selector distinguishes the JBOD brokers from the not JBOD brokers
// we need to search in all brokers to find out if there are any not JBOD brokers because
// CC cannot do disk rebalance when at least one of the brokers has not JBOD capacity configuration
_, brokersNotJBOD, err := brokersJBODSelector(allBrokerIDs, instance.Spec.CruiseControlConfig.CapacityConfig)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing disks has failed, brokerIDs: %s", brokerIDs), err)
return requeueWithError(log, "failed to determine which broker using JBOD or not JBOD capacity configuration at rebalance operation", err)
}

var cruiseControlOpRef corev1.LocalObjectReference
// when there is at least one not JBOD broker in the kafka cluster CC cannot do the disk rebalance :(
if len(brokersNotJBOD) > 0 {
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, false)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err)
}
} else {
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, true)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err)
}
}

for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
if task == nil {
continue
}

task.SetCruiseControlOperationRef(cruiseControlOpRef)
task.SetStateScheduled()
}
Expand All @@ -215,6 +234,29 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
return reconciled()
}

func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) {
logDirsByBroker, err := scaler.LogDirsByBroker()
if err != nil {
return nil, errors.Wrap(err, "failed to get list of volumes per broker from Cruise Control")
}

unavailableBrokerIDs = make([]string, 0)
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
found := false
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok {
for _, dir := range onlineDirs {
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) {
found = true
}
}
if !found {
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID)
}
}
}
return unavailableBrokerIDs, nil
}

func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) {
states := []scale.KafkaBrokerState{scale.KafkaBrokerAlive, scale.KafkaBrokerNew}
// This can result NullPointerException when the capacity calculation is missing for a broker in the cruisecontrol configmap
Expand All @@ -239,15 +281,15 @@ func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string)
}

func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs)
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false)
}

func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID})
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false)
}

func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs)
func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string, isJBOD bool) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs, isJBOD)
}

func (r *CruiseControlTaskReconciler) createCCOperation(
Expand All @@ -257,6 +299,7 @@ func (r *CruiseControlTaskReconciler) createCCOperation(
ttlSecondsAfterFinished *int,
operationType banzaiv1alpha1.CruiseControlTaskOperation,
bokerIDs []string,
isJBOD bool,
) (corev1.LocalObjectReference, error) {
operation := &banzaiv1alpha1.CruiseControlOperation{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -290,6 +333,9 @@ func (r *CruiseControlTaskReconciler) createCCOperation(

if operationType == banzaiv1alpha1.OperationRebalance {
operation.Status.CurrentTask.Parameters["destination_broker_ids"] = strings.Join(bokerIDs, ",")
if isJBOD {
operation.Status.CurrentTask.Parameters["rebalance_disk"] = "true"
}
} else {
operation.Status.CurrentTask.Parameters["brokerid"] = strings.Join(bokerIDs, ",")
}
Expand All @@ -302,6 +348,69 @@ func (r *CruiseControlTaskReconciler) createCCOperation(
}, nil
}

// brokersJBODSelector filters out the JBOD and not JBOD brokers from a broker list based on the capacityConfig
func brokersJBODSelector(brokerIDs []string, capacityConfigJSON string) (brokersJBOD []string, brokersNotJBOD []string, err error) {
// JBOD is generated by default
if capacityConfigJSON == "" {
return brokerIDs, nil, nil
}
brokerIsJBOD := make(map[string]bool)
for _, brokerID := range brokerIDs {
brokerIsJBOD[brokerID] = true
}

var capacityConfig koperatorccconf.JBODInvariantCapacityConfig
err = json.Unmarshal([]byte(capacityConfigJSON), &capacityConfig)
if err != nil {
return nil, nil, errors.Wrap(err, "could not unmarshal the user-provided broker capacity config")
}
for _, brokerCapacity := range capacityConfig.Capacities {
brokerCapacityMap, ok := brokerCapacity.(map[string]interface{})
if !ok {
continue
}

brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, v1beta1.BrokerIdLabelKey)
if err != nil {
return nil, nil, errors.WrapIfWithDetails(err,
"could not retrieve broker Id from broker capacity configuration",
"capacity configuration", brokerCapacityMap)
}
if !ok {
continue
}

_, ok, err = unstructured.NestedMap(brokerCapacityMap, BrokerCapacity, BrokerCapacityDisk)
// when the format is not a map[string]interface then it has been considered as not JBOD
if err != nil {
// brokerID -1 means all brokers get this capacity config as default
if brokerId == "-1" {
for brokerID := range brokerIsJBOD {
brokerIsJBOD[brokerID] = false
}
}
if _, ok := brokerIsJBOD[brokerId]; ok {
brokerIsJBOD[brokerId] = false
}
continue
}

// this covers the case when there was a -1 default capacity config but there is an override for a specific broker
if _, has := brokerIsJBOD[brokerId]; has && ok {
brokerIsJBOD[brokerId] = true
}
}
//
for brokerID, isJBOD := range brokerIsJBOD {
if isJBOD {
brokersJBOD = append(brokersJBOD, brokerID)
} else {
brokersNotJBOD = append(brokersNotJBOD, brokerID)
}
}
return brokersJBOD, brokersNotJBOD, nil
}

// UpdateStatus updates the Status of the provided banzaiv1beta1.KafkaCluster instance with the status of the tasks
// from a CruiseControlTasksAndStates and sends the updates to the Kubernetes API if any changes in the Status field is
// detected. Otherwise, this step is skipped.
Expand Down
Loading

0 comments on commit c51f8a7

Please sign in to comment.