diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 4a31ff0f6..b1a85426a 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -884,6 +884,12 @@ func (r *Reconciler) reconcileKafkaPvc(log logr.Logger, brokersDesiredPvcs map[s if mountPath == pvc.Annotations["mountPath"] { currentPvc = pvc.DeepCopy() alreadyCreated = true + // Checking pvc state, if bounded, so the broker has already restarted and the CC GracefulDiskRebalance has not happened yet, + // then we make it happening with status update. + if _, ok := r.KafkaCluster.Status.BrokersState[brokerId].GracefulActionState.VolumeStates[mountPath]; !ok && + currentPvc.Status.Phase == corev1.ClaimBound { + brokerVolumesState[mountPath] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired} + } break } } @@ -896,7 +902,6 @@ func (r *Reconciler) reconcileKafkaPvc(log logr.Logger, brokersDesiredPvcs map[s if err := r.Client.Create(context.TODO(), desiredPvc); err != nil { return errorfactory.New(errorfactory.APIFailure{}, err, "creating resource failed", "kind", desiredType) } - brokerVolumesState[desiredPvc.Annotations["mountPath"]] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired} continue } if err == nil {