Skip to content

Commit

Permalink
Fix empty mountPath
Browse files Browse the repository at this point in the history
  • Loading branch information
bartam1 committed Jun 9, 2022
1 parent 3c935f3 commit cbd7bac
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
9 changes: 7 additions & 2 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)

if isMountPathRemoved {
log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id)
log.Error(errors.New("removing storage from a running broker is not supported"), "", "brokerID", id, "old mountPaths", mountPathsOld, "new mountPaths", mountPathsNew)
}

if len(mountPathsMerged) != 0 {
Expand Down Expand Up @@ -160,7 +160,7 @@ func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) {
break
}
}
// if this is a new mountPath then add it to te current
// if this is a new mountPath then add it to the current
if !found {
mountPathsMerged = append(mountPathsMerged, mountPathsNew[i])
}
Expand Down Expand Up @@ -243,6 +243,11 @@ func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) []string {
mountPaths = keyVal[1]
}
}

if mountPaths == "" {
return nil
}

return strings.Split(mountPaths, ",")
}

Expand Down
70 changes: 67 additions & 3 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestGetMountPathsFromBrokerConfigMap(t *testing.T) {
expectedLogDirs []string
}{
{
testName: "1",
testName: "simple case",
brokerConfigMap: v1.ConfigMap{
Data: map[string]string{kafkautils.ConfigPropertyName: `inter.broker.listener.name=INTERNAL\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093
Expand All @@ -49,11 +49,75 @@ zookeeper.connect=zookeeper-server-client.zookeeper:2181/
},
expectedLogDirs: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
},
{
testName: "no old configs",
brokerConfigMap: v1.ConfigMap{
Data: map[string]string{},
},
expectedLogDirs: []string{},
},
}
for _, test := range tests {
logDirs := getMountPathsFromBrokerConfigMap(&test.brokerConfigMap)
if !reflect.DeepEqual(logDirs, test.expectedLogDirs) {
t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs)
if len(logDirs) != 0 && len(test.expectedLogDirs) != 0 {
if !reflect.DeepEqual(logDirs, test.expectedLogDirs) {
t.Errorf("expected: %s, got: %s", test.expectedLogDirs, logDirs)
}
}
}
}

func TestMergeMountPaths(t *testing.T) {
tests := []struct {
testName string
mountPathNew []string
mountPathOld []string
expectedMergedMountPath []string
expectedRemoved bool
}{
{
testName: "no old mountPath",
mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
},
{
testName: "same",
mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
},
{
testName: "changed order",
mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
},
{
testName: "removed one",
mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: true,
},
{
testName: "removed all",
mountPathNew: []string{},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: true,
},
}
for _, test := range tests {
mergedMountPaths, isRemoved := mergeMountPaths(test.mountPathOld, test.mountPathNew)
if !reflect.DeepEqual(mergedMountPaths, test.expectedMergedMountPath) {
t.Errorf("expected: %s, got: %s", test.expectedMergedMountPath, mergedMountPaths)
}
if isRemoved != test.expectedRemoved {
t.Errorf("expectedRemoved: %v, got: %v", test.expectedRemoved, isRemoved)
}
}
}
Expand Down

0 comments on commit cbd7bac

Please sign in to comment.