diff --git a/api/v1/redkeycluster_types.go b/api/v1/redkeycluster_types.go index 2593de5..d89e180 100644 --- a/api/v1/redkeycluster_types.go +++ b/api/v1/redkeycluster_types.go @@ -55,6 +55,15 @@ const ( StatusInitializing = "Initializing" StatusError = "Error" + RobinStatusNoReconciling = "NoReconciling" + RobinStatusUpgrading = "Upgrading" + RobinStatusScalingDown = "ScalingDown" + RobinStatusScalingUp = "ScalingUp" + RobinStatusReady = "Ready" + RobinStatusConfiguring = "Configuring" + RobinStatusInitializing = "Initializing" + RobinStatusError = "Error" + SubstatusFastUpgrading = "FastUpgrading" SubstatusEndingFastUpgrading = "EndingFastUpgrading" SubstatusSlowUpgrading = "SlowUpgrading" @@ -311,3 +320,24 @@ func IsFastOperationStatus(status RedKeyClusterSubstatus) bool { return status.Status == SubstatusFastScaling || status.Status == SubstatusEndingFastScaling || status.Status == SubstatusFastUpgrading || status.Status == SubstatusEndingFastUpgrading } + +func GetRobinStatusCodeEquivalence(redkeyClusterStatus string) string { + switch redkeyClusterStatus { + case StatusUpgrading: + return RobinStatusUpgrading + case StatusScalingDown: + return RobinStatusScalingDown + case StatusScalingUp: + return RobinStatusScalingUp + case StatusReady: + return RobinStatusReady + case StatusConfiguring: + return RobinStatusConfiguring + case StatusInitializing: + return RobinStatusInitializing + case StatusError: + return RobinStatusError + default: + return RobinStatusError + } +} diff --git a/controllers/redis_manager.go b/controllers/redis_manager.go index 2cc1f9a..6964b3e 100644 --- a/controllers/redis_manager.go +++ b/controllers/redis_manager.go @@ -257,14 +257,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas) logger := r.getHelperLogger(redkeyCluster.NamespacedName()) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness") return err } // Set the number of replicas to Robin to have the new node met to the existing nodes. - replicas, replicasPerMaster, err := robinRedis.GetReplicas() + replicas, replicasPerMaster, err := redkeyRobin.GetReplicas() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting replicas from Robin") return err @@ -274,7 +274,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin replicas") } - err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) + err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas") return err @@ -282,7 +282,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re } // Check all cluster nodes are ready from Robin. - clusterNodes, err := robinRedis.GetClusterNodes() + clusterNodes, err := redkeyRobin.GetClusterNodes() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin") return err @@ -318,14 +318,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re // Get Robin. logger := r.getHelperLogger(redkeyCluster.NamespacedName()) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness") return err } // Check cluster health. - check, errors, warnings, err := robinRedis.ClusterCheck() + check, errors, warnings, err := redkeyRobin.ClusterCheck() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin") return err @@ -363,7 +363,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re } } else { // Move slots from partition before rolling update. - completed, err := robinRedis.MoveSlots(currentPartition, currentPartition+1, 0) + completed, err := redkeyRobin.MoveSlots(currentPartition, currentPartition+1, 0) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error moving slots", "From node", currentPartition, "To node", currentPartition+1) return err @@ -374,7 +374,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re } // Forget node - err = robinRedis.ClusterResetNode(currentPartition) + err = redkeyRobin.ClusterResetNode(currentPartition) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin resetting the node", "node index", currentPartition) return err @@ -430,14 +430,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl // Get Robin. logger := r.getHelperLogger(redkeyCluster.NamespacedName()) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness") return err } // Check cluster health. - check, errors, warnings, err := robinRedis.ClusterCheck() + check, errors, warnings, err := redkeyRobin.ClusterCheck() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin") return err @@ -449,7 +449,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl // Move slots from extra node to node 0. extraNodeIndex := int(*(existingStatefulSet.Spec.Replicas)) - 1 - completed, err := robinRedis.MoveSlots(extraNodeIndex, 0, 0) + completed, err := redkeyRobin.MoveSlots(extraNodeIndex, 0, 0) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error moving slots", "From node", extraNodeIndex, "To node", 0) return err @@ -495,14 +495,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context, r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas) logger := r.getHelperLogger(redkeyCluster.NamespacedName()) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin") return err } // Set the number of replicas to Robin to have the new node met to the existing nodes. - replicas, replicasPerMaster, err := robinRedis.GetReplicas() + replicas, replicasPerMaster, err := redkeyRobin.GetReplicas() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting replicas from Robin") return err @@ -512,7 +512,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context, if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin replicas") } - err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) + err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas") return err @@ -520,7 +520,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context, } // Check all cluster nodes are ready from Robin. - check, errors, warnings, err := robinRedis.ClusterCheck() + check, errors, warnings, err := redkeyRobin.ClusterCheck() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin") return err @@ -678,15 +678,15 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust return true, nil } - // Rebuild the cluster + // Reset the cluster robin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness") return true, err } - err = robin.ClusterFix() + err = robin.ClusterRecreate() if err != nil { - r.logError(redkeyCluster.NamespacedName(), err, "Error performing a cluster fix through Robin") + r.logError(redkeyCluster.NamespacedName(), err, "Error performing a cluster recreate through Robin") return true, err } @@ -731,20 +731,20 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust return true, err } - // At this point Robin status has been updated from Ready status. Ge go back to Ready status. - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + // Set Robin status to NotReconciling. + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness") return true, err } - err = robinRedis.SetStatus(redkeyv1.StatusReady) + err = redkeyRobin.SetStatus(redkeyv1.RobinStatusNoReconciling) if err != nil { - r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin status", "status", redkeyv1.StatusReady) + r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin status", "status", redkeyv1.RobinStatusNoReconciling) return true, err } - err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.StatusReady) + err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusNoReconciling) if err != nil { - r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin status", "status", redkeyv1.StatusReady) + r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin status", "status", redkeyv1.RobinStatusNoReconciling) return true, err } @@ -752,7 +752,7 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust r.Client.Delete(ctx, existingStatefulSet) // Update Robin replicas. - err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) + err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error setting Robin replicas", "replicas", redkeyCluster.Spec.Replicas, "replicas per master", redkeyCluster.Spec.ReplicasPerMaster) @@ -843,20 +843,20 @@ func (r *RedKeyClusterReconciler) doSlowScaling(ctx context.Context, redkeyClust func (r *RedKeyClusterReconciler) scaleDownCluster(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) (bool, error) { logger := r.getHelperLogger((redkeyCluster.NamespacedName())) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes") return true, err } // Update replicas - replicas, replicasPerMaster, err := robinRedis.GetReplicas() + replicas, replicasPerMaster, err := redkeyRobin.GetReplicas() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin replicas") return true, err } if replicas != int(redkeyCluster.Spec.Replicas) || replicasPerMaster != int(redkeyCluster.Spec.ReplicasPerMaster) { - err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) + err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas", "replicas", redkeyCluster.Spec.Replicas, "replicas per master", redkeyCluster.Spec.ReplicasPerMaster) @@ -872,7 +872,7 @@ func (r *RedKeyClusterReconciler) scaleDownCluster(ctx context.Context, redkeyCl } // Check cluster replicas meet the requirements - clusterNodes, err := robinRedis.GetClusterNodes() + clusterNodes, err := redkeyRobin.GetClusterNodes() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin nodes info") return true, err @@ -979,12 +979,12 @@ func (r *RedKeyClusterReconciler) completeClusterScaleDown(ctx context.Context, func (r *RedKeyClusterReconciler) getCulledNodes(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) ([]robin.Node, error) { logger := r.getHelperLogger((redkeyCluster.NamespacedName())) - redisRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes") return nil, err } - clusterNodes, err := redisRobin.GetClusterNodes() + clusterNodes, err := redkeyRobin.GetClusterNodes() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes info from Robin") return nil, err @@ -1035,7 +1035,7 @@ func (r *RedKeyClusterReconciler) scaleUpCluster(ctx context.Context, redkeyClus func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) (bool, error) { logger := r.getHelperLogger((redkeyCluster.NamespacedName())) - robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) + redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes") return true, err @@ -1058,7 +1058,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re } // Update Robin with new replicas/replicasPerMaster - err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) + err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error updating replicas in Robin", "replicas", redkeyCluster.Spec.Replicas, "replicasPerMaster", redkeyCluster.Spec.ReplicasPerMaster) return true, err @@ -1083,7 +1083,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re // We will ensure that all cluster nodes are initialized before asking Robin to meet all new nodes, // forget outdated nodes, ensure slots coverage and rebalance. - clusterNodes, err := robinRedis.GetClusterNodes() + clusterNodes, err := redkeyRobin.GetClusterNodes() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin") return true, err @@ -1093,7 +1093,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re r.logInfo(redkeyCluster.NamespacedName(), "ScaleCluster - Inconsistency. Statefulset replicas equals to RedKeyCluster replicas but we have a different number of cluster nodes. Trying to fix it...", "RedKeyCluster replicas", redkeyCluster.Spec.Replicas, "Cluster nodes", masterNodes) } - err = robinRedis.ClusterFix() + err = redkeyRobin.ClusterFix() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error asking to Robin to ensure the cluster is ok") return true, err @@ -1109,7 +1109,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re case redkeyv1.SubstatusEndingScaling: // Final step: ensure the cluster is Ok. - check, errors, warnings, err := robinRedis.ClusterCheck() + check, errors, warnings, err := redkeyRobin.ClusterCheck() if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin") return true, err diff --git a/controllers/rediscluster_manager.go b/controllers/redkeycluster_manager.go similarity index 100% rename from controllers/rediscluster_manager.go rename to controllers/redkeycluster_manager.go diff --git a/controllers/rediscluster_reconciler.go b/controllers/redkeycluster_reconciler.go similarity index 100% rename from controllers/rediscluster_reconciler.go rename to controllers/redkeycluster_reconciler.go diff --git a/controllers/status.go b/controllers/status.go index 48f9c76..7d5eeea 100644 --- a/controllers/status.go +++ b/controllers/status.go @@ -66,7 +66,7 @@ func (r *RedKeyClusterReconciler) updateClusterStatus(ctx context.Context, redke return err } - err = robin.SetStatus(redkeyCluster.Status.Status) + err = robin.SetStatus(redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error setting the new status to Robin", "status", redkeyCluster.Status.Status) return err @@ -75,7 +75,7 @@ func (r *RedKeyClusterReconciler) updateClusterStatus(ctx context.Context, redke } // Update Robin ConfigMap status - err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyCluster.Status.Status) + err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status)) if err != nil { r.logError(redkeyCluster.NamespacedName(), err, "Error updating the new status in Robin ConfigMap", "status", redkeyCluster.Status.Status) return err diff --git a/internal/robin/robin.go b/internal/robin/robin.go index 2168f8a..ab93500 100644 --- a/internal/robin/robin.go +++ b/internal/robin/robin.go @@ -30,25 +30,27 @@ import ( ) const ( - StatusInitializing = "Initializing" - StatutsConfiguring = "Configuring" - StatusReady = "Ready" - StatusError = "Error" - StatutsUpgrading = "Upgrading" - StatusScalingDown = "ScalingDown" - StatusScalingUp = "ScalingUp" - StatusMaintenance = "Maintenance" - StatusUnknown = "Unknown" - Port = 8080 - - EndpointProtocolPrefix = "http://" - EndpointStatus = "/v1/redkeycluster/status" - EndpointReplicas = "/v1/redkeycluster/replicas" - EndpointClusterCheck = "/v1/cluster/check" - EndpointClusterNodes = "/v1/cluster/nodes" - EndpointClusterFix = "/v1/cluster/fix" - EndpointClusterMove = "/v1/cluster/move" - EndpointClusterReset = "/v1/cluster/reset/" + StatusInitializing = "Initializing" + StatutsConfiguring = "Configuring" + StatusReady = "Ready" + StatusError = "Error" + StatutsUpgrading = "Upgrading" + StatusScalingDown = "ScalingDown" + StatusScalingUp = "ScalingUp" + StatusMaintenance = "Maintenance" + StatusNoReconciling = "NoReconciling" + StatusUnknown = "Unknown" + Port = 8080 + + EndpointProtocolPrefix = "http://" + EndpointStatus = "/v1/redkeycluster/status" + EndpointReplicas = "/v1/redkeycluster/replicas" + EndpointClusterCheck = "/v1/cluster/check" + EndpointClusterNodes = "/v1/cluster/nodes" + EndpointClusterFix = "/v1/cluster/fix" + EndpointClusterMove = "/v1/cluster/move" + EndpointClusterReset = "/v1/cluster/reset/" + EndpointClusterRecreate = "/v1/cluster/recreate" ) // Configuration is the top-level configuration struct. @@ -345,6 +347,19 @@ func (r *Robin) MoveSlots(nodeIndexFrom int, nodeIndexTo int, numSlots int) (boo } } +func (r *Robin) ClusterRecreate() error { + url := EndpointProtocolPrefix + r.Pod.Status.PodIP + ":" + strconv.Itoa(Port) + EndpointClusterRecreate + + var payload []byte + body, err := doPut(url, payload) + if err != nil { + return fmt.Errorf("cluster recreate: %w", err) + } + r.Logger.Info("Asked to Robin to recreate the cluster", "response body", string(body)) + + return nil +} + func doSimpleGet(url string) ([]byte, error) { resp, err := http.Get(url) if err != nil {