Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions controller/failover/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"golang.org/x/net/context"

"go.uber.org/zap"
Expand All @@ -49,31 +50,38 @@ type Cluster struct {
tasks map[string]*storage.FailoverTask
tasksIdx []string

quitCh chan struct{}
closeOnce sync.Once
rw sync.RWMutex
closed atomic.Bool
ctx context.Context
cancelFn context.CancelFunc

rw sync.RWMutex
wg sync.WaitGroup
}

// NewCluster return a Cluster instance and start schedule goroutine
func NewCluster(ns, cluster string, stor *storage.Storage, cfg *config.FailOverConfig) *Cluster {
fn := &Cluster{
ctx, cancelFn := context.WithCancel(context.Background())
c := &Cluster{
namespace: ns,
cluster: cluster,
storage: stor,
tasks: make(map[string]*storage.FailoverTask),
quitCh: make(chan struct{}),
config: cfg,
ctx: ctx,
cancelFn: cancelFn,
}
go fn.loop()
return fn

c.wg.Add(1)
go c.loop()
return c
}

// Close will release the resource when closing
func (c *Cluster) Close() error {
c.closeOnce.Do(func() {
close(c.quitCh)
})
return nil
func (c *Cluster) Close() {
if !c.closed.CAS(false, true) {
return
}
c.cancelFn()
}

func (c *Cluster) AddTask(task *storage.FailoverTask) error {
Expand Down Expand Up @@ -145,14 +153,17 @@ func (c *Cluster) purgeTasks() {
}

func (c *Cluster) loop() {
ctx := context.Background()
defer c.wg.Done()

ticker := time.NewTicker(time.Duration(c.config.PingIntervalSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.rw.RLock()
nodesCount, err := c.storage.ClusterNodesCounts(ctx, c.namespace, c.cluster)
nodesCount, err := c.storage.ClusterNodesCounts(c.ctx, c.namespace, c.cluster)
if err != nil {
c.rw.RUnlock()
break
Expand All @@ -172,17 +183,15 @@ func (c *Cluster) loop() {
task := c.tasks[nodeAddr]
c.removeTask(idx)
if task.Type == ManualType {
c.promoteMaster(ctx, task)
c.promoteMaster(c.ctx, task)
continue
}
if err := util.PingCmd(ctx, &task.Node); err == nil {
if err := util.PingCmd(c.ctx, &task.Node); err == nil {
continue
}
c.promoteMaster(ctx, task)
c.promoteMaster(c.ctx, task)
}
c.rw.RUnlock()
case <-c.quitCh:
return
}
}
}
Expand Down
158 changes: 77 additions & 81 deletions controller/probe/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package probe
import (
"context"
"errors"
"sync"
"time"

"github.com/apache/kvrocks-controller/controller/failover"
Expand All @@ -42,6 +43,7 @@ type Cluster struct {
cluster string
storage *storage.Storage
failOver *failover.FailOver
failureMu sync.Mutex
failureCounts map[string]int64
stopCh chan struct{}
}
Expand All @@ -61,93 +63,90 @@ func (c *Cluster) start() {
go c.loop()
}

func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) (*metadata.Cluster, error) {
var latestEpoch int64
var latestNode *metadata.NodeInfo
func (c *Cluster) probeNode(ctx context.Context, node *metadata.NodeInfo) (int64, error) {
info, err := util.ClusterInfoCmd(ctx, node)
if err != nil {
switch err.Error() {
case ErrRestoringBackUp.Error():
// The node is restoring from backup, just skip it
return -1, nil
case ErrClusterNotInitialized.Error():
return -1, ErrClusterNotInitialized
default:
return -1, err
}
}
return info.ClusterCurrentEpoch, nil
}

password := ""
currentClusterStr, _ := cluster.ToSlotString()
for index, shard := range cluster.Shards {
for _, node := range shard.Nodes {
logger := logger.Get().With(
zap.String("id", node.ID),
zap.String("role", node.Role),
zap.String("addr", node.Addr),
)
// all nodes in the cluster should have the same password,
// so we just use the first node's password
if password == "" {
password = node.Password
}
if _, ok := c.failureCounts[node.Addr]; !ok {
c.failureCounts[node.Addr] = 0
}
info, err := util.ClusterInfoCmd(ctx, &node)
if err != nil {
if err.Error() == ErrRestoringBackUp.Error() {
continue
}
if err.Error() == ErrClusterNotInitialized.Error() {
// Maybe the node was restarted, just re-sync the cluster info
clusterStr, _ := cluster.ToSlotString()
err = util.SyncClusterInfo2Node(ctx, &node, clusterStr, cluster.Version)
if err != nil {
logger.With(zap.Error(err)).Warn("Failed to re-sync the cluster info")
}
continue
}
c.failureCounts[node.Addr] += 1
if c.failureCounts[node.Addr]%c.failOver.Config().MaxPingCount == 0 {
err = c.failOver.AddNode(c.namespace, c.cluster, index, node, failover.AutoType)
logger.With(zap.Error(err)).Warn("Add the node into the fail over candidates")
} else {
logger.With(
zap.Error(err),
zap.Int64("failure_count", c.failureCounts[node.Addr]),
).Warn("Failed to ping the node")
}
continue
}
if info.ClusterCurrentEpoch < cluster.Version {
err := util.SyncClusterInfo2Node(ctx, &node, currentClusterStr, cluster.Version)
if err != nil {
logger.With(
zap.Error(err),
zap.Int64("cluster_version", cluster.Version),
zap.Int64("node_version", info.ClusterCurrentEpoch),
).Info("Failed to sync the cluster info")
}
}
func (c *Cluster) increaseFailureCount(index int, node *metadata.NodeInfo) {
log := logger.Get().With(
zap.String("id", node.ID),
zap.String("role", node.Role),
zap.String("addr", node.Addr),
)

if info.ClusterMyEpoch > latestEpoch {
latestEpoch = info.ClusterMyEpoch
latestNode = &node
}
c.failureCounts[node.Addr] = 0
}
c.failureMu.Lock()
if _, ok := c.failureCounts[node.Addr]; !ok {
c.failureCounts[node.Addr] = 0
}
c.failureCounts[node.Addr] += 1
count := c.failureCounts[node.Addr]
c.failureMu.Unlock()

if latestEpoch > cluster.Version {
latestClusterStr, err := util.ClusterNodesCmd(ctx, latestNode)
if count%c.failOver.Config().MaxPingCount == 0 {
err := c.failOver.AddNode(c.namespace, c.cluster, index, *node, failover.AutoType)
if err != nil {
return nil, err
}
latestClusterInfo, err := metadata.ParseCluster(latestClusterStr)
if err != nil {
return nil, err
log.With(zap.Error(err)).Warn("Failed to add the node into the fail over candidates")
return
}
latestClusterInfo.SetPassword(password)
err = c.storage.UpdateCluster(ctx, c.namespace, latestClusterInfo)
if err != nil {
return nil, err
log.Info("Add the node into the fail over candidates")
}
}

func (c *Cluster) resetFailureCount(node *metadata.NodeInfo) {
c.failureMu.Lock()
delete(c.failureCounts, node.Addr)
c.failureMu.Unlock()
}

func (c *Cluster) probe(ctx context.Context, cluster *metadata.Cluster) {
for i, shard := range cluster.Shards {
for _, node := range shard.Nodes {
go func(shardIdx int, node metadata.NodeInfo) {
log := logger.Get().With(
zap.String("id", node.ID),
zap.String("role", node.Role),
zap.String("addr", node.Addr),
)
version, err := c.probeNode(ctx, &node)
if err != nil && !errors.Is(err, ErrClusterNotInitialized) {
c.increaseFailureCount(shardIdx, &node)
log.With(zap.Error(err)).Error("Failed to probe the node")
return
}
log.Debug("Probe the cluster node ")

if version < cluster.Version {
// sync the cluster to the latest version
err := util.SyncClusterInfo2Node(ctx, &node, cluster)
if err != nil {
log.With(zap.Error(err)).Error("Failed to sync the cluster info")
}
} else if version > cluster.Version {
log.With(
zap.Int64("node.version", version),
zap.Int64("cluster.version", cluster.Version),
).Warn("The node is in a higher version")
}
c.resetFailureCount(&node)
}(i, node)
}
return latestClusterInfo, nil
}
return cluster, nil
}

func (c *Cluster) loop() {
logger := logger.Get().With(
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.cluster),
)
Expand All @@ -159,15 +158,12 @@ func (c *Cluster) loop() {
case <-probeTicker.C:
clusterInfo, err := c.storage.GetClusterInfo(ctx, c.namespace, c.cluster)
if err != nil {
logger.With(
log.With(
zap.Error(err),
).Error("Failed to get the cluster info from the storage")
break
}
if _, err := c.probe(ctx, clusterInfo); err != nil {
logger.With(zap.Error(err)).Error("Failed to probe the cluster")
break
}
c.probe(ctx, clusterInfo)
case <-c.stopCh:
return
}
Expand Down
8 changes: 6 additions & 2 deletions util/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ func NodeInfoCmd(ctx context.Context, node *metadata.NodeInfo) (*NodeInfo, error
return nodeInfo, nil
}

func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, clusterStr string, ver int64) error {
func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, cluster *metadata.Cluster) error {
clusterStr, err := cluster.ToSlotString()
if err != nil {
return err
}
cli, err := GetRedisClient(ctx, node)
if err != nil {
return err
Expand All @@ -379,7 +383,7 @@ func SyncClusterInfo2Node(ctx context.Context, node *metadata.NodeInfo, clusterS
if err != nil {
return err
}
err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, ver).Err()
err = cli.Do(ctx, "CLUSTERX", "setnodes", clusterStr, cluster.Version).Err()
if err != nil {
return err
}
Expand Down