diff --git a/controller/failover/cluster.go b/controller/failover/cluster.go index 28b0697b..2100e074 100644 --- a/controller/failover/cluster.go +++ b/controller/failover/cluster.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "go.uber.org/atomic" "golang.org/x/net/context" "go.uber.org/zap" @@ -49,31 +50,38 @@ type Cluster struct { tasks map[string]*storage.FailoverTask tasksIdx []string - quitCh chan struct{} - closeOnce sync.Once - rw sync.RWMutex + closed atomic.Bool + ctx context.Context + cancelFn context.CancelFunc + + rw sync.RWMutex + wg sync.WaitGroup } // NewCluster return a Cluster instance and start schedule goroutine func NewCluster(ns, cluster string, stor *storage.Storage, cfg *config.FailOverConfig) *Cluster { - fn := &Cluster{ + ctx, cancelFn := context.WithCancel(context.Background()) + c := &Cluster{ namespace: ns, cluster: cluster, storage: stor, tasks: make(map[string]*storage.FailoverTask), - quitCh: make(chan struct{}), config: cfg, + ctx: ctx, + cancelFn: cancelFn, } - go fn.loop() - return fn + + c.wg.Add(1) + go c.loop() + return c } // Close will release the resource when closing -func (c *Cluster) Close() error { - c.closeOnce.Do(func() { - close(c.quitCh) - }) - return nil +func (c *Cluster) Close() { + if !c.closed.CAS(false, true) { + return + } + c.cancelFn() } func (c *Cluster) AddTask(task *storage.FailoverTask) error { @@ -145,14 +153,17 @@ func (c *Cluster) purgeTasks() { } func (c *Cluster) loop() { - ctx := context.Background() + defer c.wg.Done() + ticker := time.NewTicker(time.Duration(c.config.PingIntervalSeconds) * time.Second) defer ticker.Stop() for { select { + case <-c.ctx.Done(): + return case <-ticker.C: c.rw.RLock() - nodesCount, err := c.storage.ClusterNodesCounts(ctx, c.namespace, c.cluster) + nodesCount, err := c.storage.ClusterNodesCounts(c.ctx, c.namespace, c.cluster) if err != nil { c.rw.RUnlock() break @@ -172,17 +183,15 @@ func (c *Cluster) loop() { task := c.tasks[nodeAddr] c.removeTask(idx) if task.Type == ManualType { - c.promoteMaster(ctx, task) + c.promoteMaster(c.ctx, task) continue } - if err := util.PingCmd(ctx, &task.Node); err == nil { + if err := util.PingCmd(c.ctx, &task.Node); err == nil { continue } - c.promoteMaster(ctx, task) + c.promoteMaster(c.ctx, task) } c.rw.RUnlock() - case <-c.quitCh: - return } } } diff --git a/controller/probe/cluster.go b/controller/probe/cluster.go index 3d073c2b..65863836 100644 --- a/controller/probe/cluster.go +++ b/controller/probe/cluster.go @@ -22,6 +22,7 @@ package probe import ( "context" "errors" + "sync" "time" "github.com/apache/kvrocks-controller/controller/failover" @@ -42,6 +43,7 @@ type Cluster struct { cluster string storage *storage.Storage failOver *failover.FailOver + failureMu sync.Mutex failureCounts map[string]int64 stopCh chan struct{} } @@ -61,93 +63,90 @@ func (c *Cluster) start() { go c.loop() } -func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) (*metadata.Cluster, error) { - var latestEpoch int64 - var latestNode *metadata.NodeInfo +func (c *Cluster) probeNode(ctx context.Context, node *metadata.NodeInfo) (int64, error) { + info, err := util.ClusterInfoCmd(ctx, node) + if err != nil { + switch err.Error() { + case ErrRestoringBackUp.Error(): + // The node is restoring from backup, just skip it + return -1, nil + case ErrClusterNotInitialized.Error(): + return -1, ErrClusterNotInitialized + default: + return -1, err + } + } + return info.ClusterCurrentEpoch, nil +} - password := "" - currentClusterStr, _ := cluster.ToSlotString() - for index, shard := range cluster.Shards { - for _, node := range shard.Nodes { - logger := logger.Get().With( - zap.String("id", node.ID), - zap.String("role", node.Role), - zap.String("addr", node.Addr), - ) - // all nodes in the cluster should have the same password, - // so we just use the first node's password - if password == "" { - password = node.Password - } - if _, ok := c.failureCounts[node.Addr]; !ok { - c.failureCounts[node.Addr] = 0 - } - info, err := util.ClusterInfoCmd(ctx, &node) - if err != nil { - if err.Error() == ErrRestoringBackUp.Error() { - continue - } - if err.Error() == ErrClusterNotInitialized.Error() { - // Maybe the node was restarted, just re-sync the cluster info - clusterStr, _ := cluster.ToSlotString() - err = util.SyncClusterInfo2Node(ctx, &node, clusterStr, cluster.Version) - if err != nil { - logger.With(zap.Error(err)).Warn("Failed to re-sync the cluster info") - } - continue - } - c.failureCounts[node.Addr] += 1 - if c.failureCounts[node.Addr]%c.failOver.Config().MaxPingCount == 0 { - err = c.failOver.AddNode(c.namespace, c.cluster, index, node, failover.AutoType) - logger.With(zap.Error(err)).Warn("Add the node into the fail over candidates") - } else { - logger.With( - zap.Error(err), - zap.Int64("failure_count", c.failureCounts[node.Addr]), - ).Warn("Failed to ping the node") - } - continue - } - if info.ClusterCurrentEpoch < cluster.Version { - err := util.SyncClusterInfo2Node(ctx, &node, currentClusterStr, cluster.Version) - if err != nil { - logger.With( - zap.Error(err), - zap.Int64("cluster_version", cluster.Version), - zap.Int64("node_version", info.ClusterCurrentEpoch), - ).Info("Failed to sync the cluster info") - } - } +func (c *Cluster) increaseFailureCount(index int, node *metadata.NodeInfo) { + log := logger.Get().With( + zap.String("id", node.ID), + zap.String("role", node.Role), + zap.String("addr", node.Addr), + ) - if info.ClusterMyEpoch > latestEpoch { - latestEpoch = info.ClusterMyEpoch - latestNode = &node - } - c.failureCounts[node.Addr] = 0 - } + c.failureMu.Lock() + if _, ok := c.failureCounts[node.Addr]; !ok { + c.failureCounts[node.Addr] = 0 } + c.failureCounts[node.Addr] += 1 + count := c.failureCounts[node.Addr] + c.failureMu.Unlock() - if latestEpoch > cluster.Version { - latestClusterStr, err := util.ClusterNodesCmd(ctx, latestNode) + if count%c.failOver.Config().MaxPingCount == 0 { + err := c.failOver.AddNode(c.namespace, c.cluster, index, *node, failover.AutoType) if err != nil { - return nil, err - } - latestClusterInfo, err := metadata.ParseCluster(latestClusterStr) - if err != nil { - return nil, err + log.With(zap.Error(err)).Warn("Failed to add the node into the fail over candidates") + return } - latestClusterInfo.SetPassword(password) - err = c.storage.UpdateCluster(ctx, c.namespace, latestClusterInfo) - if err != nil { - return nil, err + log.Info("Add the node into the fail over candidates") + } +} + +func (c *Cluster) resetFailureCount(node *metadata.NodeInfo) { + c.failureMu.Lock() + delete(c.failureCounts, node.Addr) + c.failureMu.Unlock() +} + +func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) { + for i, shard := range cluster.Shards { + for _, node := range shard.Nodes { + go func(shardIdx int, node metadata.NodeInfo) { + log := logger.Get().With( + zap.String("id", node.ID), + zap.String("role", node.Role), + zap.String("addr", node.Addr), + ) + version, err := c.probeNode(ctx, &node) + if err != nil && !errors.Is(err, ErrClusterNotInitialized) { + c.increaseFailureCount(shardIdx, &node) + log.With(zap.Error(err)).Error("Failed to probe the node") + return + } + log.Debug("Probe the cluster node ") + + if version < cluster.Version { + // sync the cluster to the latest version + err := util.SyncClusterInfo2Node(ctx, &node, cluster) + if err != nil { + log.With(zap.Error(err)).Error("Failed to sync the cluster info") + } + } else if version > cluster.Version { + log.With( + zap.Int64("node.version", version), + zap.Int64("cluster.version", cluster.Version), + ).Warn("The node is in a higher version") + } + c.resetFailureCount(&node) + }(i, node) } - return latestClusterInfo, nil } - return cluster, nil } func (c *Cluster) loop() { - logger := logger.Get().With( + log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("cluster", c.cluster), ) @@ -159,15 +158,12 @@ func (c *Cluster) loop() { case <-probeTicker.C: clusterInfo, err := c.storage.GetClusterInfo(ctx, c.namespace, c.cluster) if err != nil { - logger.With( + log.With( zap.Error(err), ).Error("Failed to get the cluster info from the storage") break } - if _, err := c.probe(ctx, clusterInfo); err != nil { - logger.With(zap.Error(err)).Error("Failed to probe the cluster") - break - } + c.probe(ctx, clusterInfo) case <-c.stopCh: return } diff --git a/util/redis.go b/util/redis.go index b90b5282..12045a03 100644 --- a/util/redis.go +++ b/util/redis.go @@ -370,7 +370,11 @@ func NodeInfoCmd(ctx context.Context, node *metadata.NodeInfo) (*NodeInfo, error return nodeInfo, nil } -func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, clusterStr string, ver int64) error { +func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, cluster *metadata.Cluster) error { + clusterStr, err := cluster.ToSlotString() + if err != nil { + return err + } cli, err := GetRedisClient(ctx, node) if err != nil { return err @@ -379,7 +383,7 @@ func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, clusterS if err != nil { return err } - err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, ver).Err() + err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, cluster.Version).Err() if err != nil { return err }