Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow to delete volume from brokers #820

Merged
merged 15 commits into from
Jun 16, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ webhooks:
resources:
- kafkatopics
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
caBundle: {{ $caCrt }}
service:
name: "{{ include "kafka-operator.fullname" . }}-operator"
namespace: {{ .Release.Namespace }}
path: /validate
stoader marked this conversation as resolved.
Show resolved Hide resolved
failurePolicy: Fail
name: kafkaclusters.kafka.banzaicloud.io
rules:
- apiGroups:
- kafka.banzaicloud.io
apiVersions:
- v1beta1
operations:
- UPDATE
resources:
- kafkaclusters
sideEffects: None
---
apiVersion: v1
kind: Secret
Expand Down
78 changes: 72 additions & 6 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package kafka

import (
"context"
"fmt"
"sort"
"strings"

"emperror.dev/errors"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"

apiutil "github.com/banzaicloud/koperator/api/util"
"github.com/banzaicloud/koperator/api/v1alpha1"
Expand All @@ -34,6 +38,8 @@ import (
properties "github.com/banzaicloud/koperator/properties/pkg"
)

const brokerLogDirPropertyName = "log.dirs"

func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32,
extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList,
serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) *properties.Properties {
Expand Down Expand Up @@ -106,10 +112,24 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
log.Error(err, "setting broker.id in broker configuration resulted an error")
}

// Storage configuration
storageConf := generateStorageConfig(bConfig.StorageConfigs)
if storageConf != "" {
if err := config.Set("log.dirs", storageConf); err != nil {
// This logic prevents the removal of the mountPath from the broker configmap
brokerConfigMapName := fmt.Sprintf(brokerConfigTemplate+"-%d", r.KafkaCluster.Name, id)
var brokerConfigMapOld v1.ConfigMap
err = r.Client.Get(context.Background(), client.ObjectKey{Name: brokerConfigMapName, Namespace: r.KafkaCluster.GetNamespace()}, &brokerConfigMapOld)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error")
}

mountPathsOld := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld)
mountPathsNew := generateStorageConfig(bConfig.StorageConfigs)
mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)

if isMountPathRemoved {
log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id)
}

if len(mountPathsMerged) != 0 {
if err := config.Set(brokerLogDirPropertyName, strings.Join(mountPathsMerged, ",")); err != nil {
log.Error(err, "setting log.dirs in broker configuration resulted an error")
}
}
Expand All @@ -124,6 +144,36 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
return config
}

// mergeMountPaths is merges the new mountPaths with the old.
// It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld
func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) {
var mountPathsMerged []string
mountPathsMerged = append(mountPathsMerged, mountPathsOld...)
mountPathsOldLen := len(mountPathsOld)
// Merging the new mountPaths with the old. If any of them is removed we can check the difference in the mountPathsOldLen
for i := range mountPathsNew {
found := false
for k := range mountPathsOld {
if mountPathsOld[k] == mountPathsNew[i] {
stoader marked this conversation as resolved.
Show resolved Hide resolved
found = true
mountPathsOldLen--
break
}
}
// if this is a new mountPath then add it to te current
if !found {
mountPathsMerged = append(mountPathsMerged, mountPathsNew[i])
}
}
// If any of them is removed we can check the difference in the mountPathsOldLen
isMountPathRemoved := false
if mountPathsOldLen > 0 {
isMountPathRemoved = true
}

return mountPathsMerged, isMountPathRemoved
}

func generateSuperUsers(users []string) (suStrings []string) {
suStrings = make([]string, 0)
for _, x := range users {
Expand Down Expand Up @@ -180,12 +230,28 @@ func appendListenerConfigs(advertisedListenerConfig []string, id int32,
return advertisedListenerConfig
}

func generateStorageConfig(sConfig []v1beta1.StorageConfig) string {
func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string {
if configMap == nil {
return nil
}
brokerConfig := configMap.Data[kafkautils.ConfigPropertyName]
brokerConfigsLines := strings.Split(brokerConfig, "\n")
bartam1 marked this conversation as resolved.
Show resolved Hide resolved
var mountPaths string
for i := range brokerConfigsLines {
keyVal := strings.Split(brokerConfigsLines[i], "=")
if len(keyVal) == 2 && keyVal[0] == brokerLogDirPropertyName {
mountPaths = keyVal[1]
}
}
return strings.Split(mountPaths, ",")
}

func generateStorageConfig(sConfig []v1beta1.StorageConfig) []string {
mountPaths := make([]string, 0, len(sConfig))
for _, storage := range sConfig {
mountPaths = append(mountPaths, util.StorageConfigKafkaMountPath(storage.MountPath))
}
return strings.Join(mountPaths, ",")
return mountPaths
}

func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) string {
Expand Down
29 changes: 29 additions & 0 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,49 @@
package kafka

import (
"reflect"
"testing"

"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/banzaicloud/koperator/pkg/util"
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"

properties "github.com/banzaicloud/koperator/properties/pkg"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/resources"
)

func TestGetMountPathsFromBrokerConfigMap(t *testing.T) {
tests := []struct {
testName string
brokerConfigMap v1.ConfigMap
expectedLogDirs []string
}{
{
testName: "1",
brokerConfigMap: v1.ConfigMap{
Data: map[string]string{kafkautils.ConfigPropertyName: `inter.broker.listener.name=INTERNAL\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs3/kafka,/kafka-logs/kafka,/kafka-logs2/kafka,/kafka-logs4/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter\noffsets.topic.replication.factor=2
zookeeper.connect=zookeeper-server-client.zookeeper:2181/
`},
},
expectedLogDirs: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
},
}
for _, test := range tests {
logDirs := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap)
if !reflect.DeepEqual(logDirs, test.expectedLogDirs) {
t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs)
}
}
}

func TestGenerateBrokerConfig(t *testing.T) { //nolint funlen
tests := []struct {
testName string
Expand Down
73 changes: 73 additions & 0 deletions pkg/webhook/kafkacluster_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright © 2019 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhook

import (
"context"
"fmt"

admissionv1 "k8s.io/api/admission/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
)

func (s *webhookServer) validateKafkaCluster(kafkaClusterNew *banzaicloudv1beta1.KafkaCluster) *admissionv1.AdmissionResponse {
ctx := context.Background()
// get the Old kafkaCluster CR
kafkaClusterSpecOld := banzaicloudv1beta1.KafkaCluster{}
err := s.client.Get(ctx, client.ObjectKey{Name: kafkaClusterNew.GetName(), Namespace: kafkaClusterNew.GetNamespace()}, &kafkaClusterSpecOld)
if err != nil {
// New kafkaCluster has been added thus no need to check storage removal
if apierrors.IsNotFound(err) {
return nil
}
log.Error(err, "couldn't get KafkaCluster custom resource")
return notAllowed("API failure while retrieving KafkaCluster CR, please try again", metav1.StatusReasonInternalError)
}

return checkBrokerStorageRemoval(&kafkaClusterSpecOld.Spec, &kafkaClusterNew.Spec)
}

// checkBrokerStorageRemoval checks if there is any broker storage which has been removed. If yes, admission will be rejected
func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) *admissionv1.AdmissionResponse {
bartam1 marked this conversation as resolved.
Show resolved Hide resolved
for _, brokerOld := range kafkaClusterSpecOld.Brokers {
for _, brokerNew := range kafkaClusterSpecNew.Brokers {
if brokerOld.Id == brokerNew.Id {
brokerConfigsOld, _ := brokerOld.GetBrokerConfig(*kafkaClusterSpecOld)
brokerConfigsNew, _ := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew)
for _, storageConfigOld := range brokerConfigsOld.StorageConfigs {
isStorageFound := false

for _, storageConfigNew := range brokerConfigsNew.StorageConfigs {
if storageConfigOld.MountPath == storageConfigNew.MountPath {
isStorageFound = true
break
}
}
if !isStorageFound {
log.Info(fmt.Sprintf("Not allowed to remove broker storage with mountPath: %s from brokerID: %v", storageConfigOld.MountPath, brokerOld.Id))
return notAllowed(fmt.Sprintf("Removing storage from a runnng broker is not supported! (mountPath: %s, brokerID: %v)", storageConfigOld.MountPath, brokerOld.Id), metav1.StatusReasonInvalid)
}
}
}
}
}
pregnor marked this conversation as resolved.
Show resolved Hide resolved
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}