Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ process-manifests: kustomize process-manifests-crd ## Generate the kustomized y
$(SED) -i 's/watch-namespace/$(NAMESPACE)/' deployment/manager.yaml; \
elif [ ${PROFILE} == "debug" ]; then \
(cd config/deploy-profiles/${PROFILE} && \
$(KUSTOMIZE) edit set image /redkey-operator=${IMG_DEBUG}); \
$(KUSTOMIZE) edit set image redkey-operator=${IMG_DEBUG}); \
$(KUSTOMIZE) build config/deploy-profiles/debug > deployment/manager.yaml; \
$(SED) -i 's/watch-namespace/$(NAMESPACE)/' deployment/manager.yaml; \
fi
Expand Down
1 change: 1 addition & 0 deletions api/v1/redkeycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
SubstatusUpgradingScalingUp = "ScalingUp"
SubstatusUpgradingScalingDown = "ScalingDown"
SubstatusEndingSlowUpgrading = "EndingSlowUpgrading"
SubstatusRollingConfig = "RollingConfig"

SubstatusFastScaling = "FastScaling"
SubstatusEndingFastScaling = "EndingFastScaling"
Expand Down
159 changes: 145 additions & 14 deletions controllers/redis_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func (r *RedKeyClusterReconciler) doSlowUpgrade(ctx context.Context, redkeyClust
err = r.doSlowUpgradeEnd(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusUpgradingScalingDown:
err = r.doSlowUpgradeScalingDown(ctx, redkeyCluster, existingStatefulSet)
case redkeyv1.SubstatusRollingConfig:
err = r.doSlowUpgradeRollingUpdate(ctx, redkeyCluster, existingStatefulSet)
default:
err = r.doSlowUpgradeStart(ctx, redkeyCluster, existingStatefulSet)
}
Expand Down Expand Up @@ -292,6 +294,17 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
return nil // Not all nodes ready --> Keep waiting
}

// Check cluster health.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
}
if !check {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster readiness", "errors", errors, "warnings", warnings)
return nil // Cluster not ready --> keep waiting
}

// Update substatus.
err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusSlowUpgrading, "")
if err != nil {
Expand Down Expand Up @@ -324,6 +337,17 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
return err
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Check cluster health.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
Expand Down Expand Up @@ -372,13 +396,13 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
r.logInfo(redkeyCluster.NamespacedName(), "Waiting to complete moving slots", "From node", currentPartition, "To node", currentPartition+1)
return nil // Move slots not completed --> keep waiting
}
}

// Forget node
err = redkeyRobin.ClusterResetNode(currentPartition)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin resetting the node", "node index", currentPartition)
return err
}
// Stop Robin reconciliation
err = redkeyRobin.SetAndPersistRobinStatus(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusNoReconciling)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating/persisting Robin status", "status", redkeyv1.RobinStatusNoReconciling)
return err
}

// RollingUpdate
Expand All @@ -394,6 +418,85 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
return err
}

// Reset node
err = redkeyRobin.ClusterResetNode(currentPartition)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin forgeting the node", "node index", currentPartition)
return err
}

err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusRollingConfig, strconv.Itoa(currentPartition))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
}

return nil
}

func (r *RedKeyClusterReconciler) doSlowUpgradeRollingUpdate(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster, existingStatefulSet *v1.StatefulSet) error {

var err error

// Check Redis node pods rediness.
nodePodsReady, err := r.allPodsReady(ctx, redkeyCluster)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Could not check for Redis node pods being ready")
return err
}
if !nodePodsReady {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for Redis node pods to become ready")
return nil // Not all pods ready -> keep waiting
}
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)

// Get Robin.
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return err
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Check cluster health.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
}
if !check {
r.logInfo(redkeyCluster.NamespacedName(), "Waiting for cluster readiness", "errors", errors, "warnings", warnings)
return nil // Cluster not ready --> keep waiting
}

// Get the current partition and update Upgrading Partition in RedKeyCluster Status if starting iterating over partitions.
var currentPartition int
if redkeyCluster.Status.Substatus.UpgradingPartition == "" {
currentPartition = int(*(existingStatefulSet.Spec.Replicas)) - 1
err := r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusSlowUpgrading, strconv.Itoa(currentPartition))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
return err
}
} else {
currentPartition, err = strconv.Atoi(redkeyCluster.Status.Substatus.UpgradingPartition)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Upgrading Partition from RedKeyCluster object")
return err
}
}

// If first partition reached, we can move to the next step.
// Else step over to the next partition.
if currentPartition == 0 {
Expand All @@ -411,6 +514,13 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
}
}

// Restart Robin reconciliation
err = redkeyRobin.SetAndPersistRobinStatus(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusUpgrading)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating/persisting Robin status", "status", redkeyv1.RobinStatusUpgrading)
return err
}

return nil
}

Expand All @@ -436,6 +546,17 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl
return err
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
}
if len(clusterNodes.Nodes) != int(*existingStatefulSet.Spec.Replicas) {
r.logInfo(redkeyCluster.NamespacedName(), "Not all cluster nodes are yet ready from Robin")
return nil // Not all nodes ready --> Keep waiting
}

// Check cluster health.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
Expand All @@ -459,7 +580,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl
return nil // Move slots not completed --> keep waiting
}

// ScaleDown the cluster adding one extra node before start upgrading.
// ScaleDown the cluster
r.logInfo(redkeyCluster.NamespacedName(), "Scaling down the cluster to remove the extra node")
_, err = r.updateRdclReplicas(ctx, redkeyCluster, redkeyCluster.Spec.Replicas-1)
if err != nil {
Expand All @@ -471,6 +592,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl
r.logError(redkeyCluster.NamespacedName(), err, "Failed to update StatefulSet replicas")
return err
}

// Reset node
err = redkeyRobin.ClusterResetNode(extraNodeIndex)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin forgeting the node", "node index", extraNodeIndex)
return err
}

err = r.updateClusterSubStatus(ctx, redkeyCluster, redkeyv1.SubstatusUpgradingScalingDown, "0")
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating substatus")
Expand Down Expand Up @@ -519,6 +648,13 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context,
}
}

// Reset node
err = redkeyRobin.ClusterResetNode(int(redkeyCluster.Spec.Replicas) + 1)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin forgeting the node", "node index", int(redkeyCluster.Spec.Replicas)+1)
return err
}

// Check all cluster nodes are ready from Robin.
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
Expand Down Expand Up @@ -737,14 +873,9 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return true, err
}
err = redkeyRobin.SetStatus(redkeyv1.RobinStatusNoReconciling)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin status", "status", redkeyv1.RobinStatusNoReconciling)
return true, err
}
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusNoReconciling)
err = redkeyRobin.SetAndPersistRobinStatus(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusNoReconciling)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin status", "status", redkeyv1.RobinStatusNoReconciling)
r.logError(redkeyCluster.NamespacedName(), err, "Error updating/persisting Robin status", "status", redkeyv1.RobinStatusNoReconciling)
return true, err
}

Expand Down
11 changes: 6 additions & 5 deletions controllers/redkeycluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
DefaultRequeueTimeout time.Duration = 5
ScalingDefaultTimeout time.Duration = 30
ReadyRequeueTimeout time.Duration = 30
ErrorRequeueTimeout time.Duration = 30
DefaultRequeueTimeout time.Duration = 5
UpgradingDefaultTimeout time.Duration = 30
ScalingDefaultTimeout time.Duration = 30
ReadyRequeueTimeout time.Duration = 30
ErrorRequeueTimeout time.Duration = 30
)

func NewRedKeyClusterReconciler(mgr ctrl.Manager, maxConcurrentReconciles int, concurrentMigrates int) *RedKeyClusterReconciler {
Expand Down Expand Up @@ -284,7 +285,7 @@ func (r *RedKeyClusterReconciler) reconcileStatusUpgrading(ctx context.Context,
r.Recorder.Event(redkeyCluster, "Warning", "ClusterError", err.Error())
}

return requeue, DefaultRequeueTimeout
return requeue, UpgradingDefaultTimeout
}

func (r *RedKeyClusterReconciler) reconcileStatusScalingDown(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) (bool, time.Duration) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/redkeycluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (r *RedKeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return r.ReconcileClusterObject(ctx, req, redkeyCluster)
} else {
// cluster deleted
r.Log.Info("Can't find RedisCRedKeyClusterluster, probably deleted", "redis-cluster", req.NamespacedName)
r.Log.Info("Can't find RedKeyCluster, probably deleted", "redis-cluster", req.NamespacedName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *RedKeyClusterReconciler) updateClusterStatus(ctx context.Context, redke
}

// Update Robin ConfigMap status
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status))
err = robin.PersistRobinStatus(ctx, r.Client, redkeyCluster, redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating the new status in Robin ConfigMap", "status", redkeyCluster.Status.Status)
return err
Expand Down
4 changes: 4 additions & 0 deletions debug.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 INDUSTRIA DE DISEÑO TEXTIL, S.A. (INDITEX, S.A.)
#
# SPDX-License-Identifier: Apache-2.0

FROM golang:1.24.6

RUN go install github.com/go-delve/delve/cmd/dlv@v1.25
Expand Down
10 changes: 9 additions & 1 deletion internal/robin/robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ func (r *Robin) SetStatus(status string) error {
return nil
}

func (r *Robin) SetAndPersistRobinStatus(ctx context.Context, client ctrlClient.Client, redkeyCluster *redkeyv1.RedKeyCluster, newStatus string) error {
err := r.SetStatus(newStatus)
if err != nil {
return err
}
return PersistRobinStatus(ctx, client, redkeyCluster, newStatus)
}

func (r *Robin) GetReplicas() (int, int, error) {
url := EndpointProtocolPrefix + r.Pod.Status.PodIP + ":" + strconv.Itoa(Port) + EndpointReplicas

Expand Down Expand Up @@ -426,7 +434,7 @@ func (cn *ClusterNodes) GetReplicaNodes() []*Node {
}

// Updates configuration in Robin ConfigMap with the new status.
func PersistRobinStatut(ctx context.Context, client ctrlClient.Client, redkeyCluster *redkeyv1.RedKeyCluster, newStatus string) error {
func PersistRobinStatus(ctx context.Context, client ctrlClient.Client, redkeyCluster *redkeyv1.RedKeyCluster, newStatus string) error {
cmap := &corev1.ConfigMap{}
err := client.Get(ctx, types.NamespacedName{Name: redkeyCluster.Name + "-robin", Namespace: redkeyCluster.Namespace}, cmap)
if err != nil {
Expand Down