Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/client/command/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func printCluster(cluster *store.Cluster) {
role = strings.ToUpper(store.RoleMaster)
}
migratingStatus := "NO"
if shard.MigratingSlot != -1 {
migratingStatus = fmt.Sprintf("%d --> %d", shard.MigratingSlot, shard.TargetShardIndex)
if shard.MigratingSlot != nil {
migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex)
}
columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus}
writer.Append(columns)
Expand Down
16 changes: 7 additions & 9 deletions cmd/client/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"strconv"
"strings"

"github.com/apache/kvrocks-controller/store"
"github.com/spf13/cobra"
)

type MigrationOptions struct {
namespace string
cluster string
slot int
slot string
target int
slotOnly bool
}
Expand Down Expand Up @@ -69,14 +70,11 @@ func migrationPreRun(_ *cobra.Command, args []string) error {
if len(args) < 2 {
return fmt.Errorf("the slot number should be specified")
}
slot, err := strconv.Atoi(args[1])
_, err := store.ParseSlotRange(args[1])
if err != nil {
return fmt.Errorf("invalid slot number: %s", args[1])
return fmt.Errorf("invalid slot number: %s, error: %w", args[1], err)
}
if slot < 0 || slot > 16383 {
return errors.New("slot number should be in range [0, 16383]")
}
migrateOptions.slot = slot
migrateOptions.slot = args[1]

if migrateOptions.namespace == "" {
return fmt.Errorf("namespace is required, please specify with -n or --namespace")
Expand Down Expand Up @@ -106,12 +104,12 @@ func migrateSlot(client *client, options *MigrationOptions) error {
if rsp.IsError() {
return errors.New(rsp.String())
}
printLine("migrate slot[%d] task is submitted successfully.", options.slot)
printLine("migrate slot[%s] task is submitted successfully.", options.slot)
return nil
}

func init() {
MigrateCommand.Flags().IntVar(&migrateOptions.slot, "slot", -1, "The slot to migrate")
MigrateCommand.Flags().StringVar(&migrateOptions.slot, "slot", "", "The slot to migrate")
MigrateCommand.Flags().IntVar(&migrateOptions.target, "target", -1, "The target node")
MigrateCommand.Flags().StringVarP(&migrateOptions.namespace, "namespace", "n", "", "The namespace")
MigrateCommand.Flags().StringVarP(&migrateOptions.cluster, "cluster", "c", "", "The cluster")
Expand Down
30 changes: 16 additions & 14 deletions consts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ package consts
import "errors"

var (
ErrInvalidArgument = errors.New("invalid argument")
ErrNotFound = errors.New("not found")
ErrForbidden = errors.New("forbidden")
ErrAlreadyExists = errors.New("already exists")
ErrIndexOutOfRange = errors.New("index out of range")
ErrShardIsSame = errors.New("source and target shard is same")
ErrSlotOutOfRange = errors.New("slot out of range")
ErrSlotNotBelongToAnyShard = errors.New("slot not belong to any shard")
ErrNodeIsNotMaster = errors.New("the old node is not master")
ErrOldMasterNodeNotFound = errors.New("old master node not found")
ErrShardNoReplica = errors.New("no replica in shard")
ErrShardIsServicing = errors.New("shard is servicing")
ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrInvalidArgument = errors.New("invalid argument")
ErrNotFound = errors.New("not found")
ErrForbidden = errors.New("forbidden")
ErrAlreadyExists = errors.New("already exists")
ErrIndexOutOfRange = errors.New("index out of range")
ErrShardIsSame = errors.New("source and target shard is same")
ErrSlotOutOfRange = errors.New("slot out of range")
ErrSlotNotBelongToAnyShard = errors.New("slot not belong to any shard")
ErrSlotRangeBelongsToMultipleShards = errors.New("slot range belongs to multiple shards")
ErrNodeIsNotMaster = errors.New("the old node is not master")
ErrOldMasterNodeNotFound = errors.New("old master node not found")
ErrShardNoReplica = errors.New("no replica in shard")
ErrShardIsServicing = errors.New("shard is servicing")
ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal")
)
21 changes: 14 additions & 7 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,16 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
return
}
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
if sourceNodeClusterInfo.MigratingSlot == nil {
log.Error("The source migration slot is empty",
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
}
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
log.Error("Mismatch migrating slot",
zap.Int("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot),
zap.Int("migrating_slot", shard.MigratingSlot),
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
}
Expand All @@ -347,17 +353,18 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
return
}
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.Int("slot", migratingSlot))
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
case "success":
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
)
migratedSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
} else {
log.Info("Migrate the slot successfully", zap.Int("slot", migratedSlot))
log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
}
c.updateCluster(clonedCluster)
default:
Expand Down
6 changes: 4 additions & 2 deletions controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode0, mockNode1, mockNode2, mockNode3,
},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
MigratingSlot: -1,
MigratingSlot: nil,
TargetShardIndex: -1,
}},
}
Expand Down Expand Up @@ -219,7 +219,9 @@ func TestCluster_MigrateSlot(t *testing.T) {
defer func() {
require.NoError(t, cluster.Reset(ctx))
}()
require.NoError(t, cluster.MigrateSlot(ctx, 0, 1, false))
slotRange, err := store.NewSlotRange(0, 0)
require.NoError(t, err)
require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false))

s := NewMockClusterStore()
require.NoError(t, s.CreateCluster(ctx, ns, cluster))
Expand Down
6 changes: 3 additions & 3 deletions server/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
)

type MigrateSlotRequest struct {
Target int `json:"target" validate:"required"`
Slot int `json:"slot" validate:"required"`
SlotOnly bool `json:"slot_only"`
Target int `json:"target" validate:"required"`
Slot store.SlotRange `json:"slot" validate:"required"`
SlotOnly bool `json:"slot_only"`
}

type CreateClusterRequest struct {
Expand Down
16 changes: 10 additions & 6 deletions server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ func TestClusterBasics(t *testing.T) {
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}}
slotRange, err := store.NewSlotRange(3, 3)
require.NoError(t, err)
testMigrateReq := &MigrateSlotRequest{
Slot: 3,
Slot: *slotRange,
SlotOnly: true,
Target: 1,
}
Expand Down Expand Up @@ -163,7 +165,6 @@ func TestClusterBasics(t *testing.T) {
runRemove(t, "test-cluster", http.StatusNoContent)
runRemove(t, "not-exist", http.StatusNotFound)
})

}

func TestClusterImport(t *testing.T) {
Expand Down Expand Up @@ -233,8 +234,10 @@ func TestClusterMigrateData(t *testing.T) {
reqCtx := GetTestContext(recorder)
reqCtx.Set(consts.ContextKeyStore, handler.s)
reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}}
slotRange, err := store.NewSlotRange(0, 0)
require.NoError(t, err)
testMigrateReq := &MigrateSlotRequest{
Slot: 0,
Slot: *slotRange,
Target: 1,
}
body, err := json.Marshal(testMigrateReq)
Expand All @@ -248,14 +251,15 @@ func TestClusterMigrateData(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 1, gotCluster.Version.Load())
require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, &store.SlotRange{Start: 0, Stop: 0}, gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)

ctrl, err := controller.New(handler.s.(*store.ClusterStore), &config.ControllerConfig{
FailOver: &config.FailOverConfig{
PingIntervalSeconds: 1,
MaxPingCount: 3,
}})
},
})
require.NoError(t, err)
require.NoError(t, ctrl.Start(ctx))
ctrl.WaitForReady()
Expand All @@ -268,6 +272,6 @@ func TestClusterMigrateData(t *testing.T) {
if err != nil {
return false
}
return gotCluster.Shards[0].MigratingSlot == -1
return gotCluster.Shards[0].MigratingSlot == nil
}, 10*time.Second, 100*time.Millisecond)
}
11 changes: 7 additions & 4 deletions server/api/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func TestShardBasics(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
{Key: "shard", Value: strconv.Itoa(shardIndex)}}
{Key: "shard", Value: strconv.Itoa(shardIndex)},
}

middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
Expand All @@ -103,7 +104,8 @@ func TestShardBasics(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
{Key: "shard", Value: "1"}}
{Key: "shard", Value: "1"},
}

middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
Expand All @@ -124,7 +126,7 @@ func TestShardBasics(t *testing.T) {
nodeAddrs = append(nodeAddrs, node.Addr())
}
require.ElementsMatch(t, []string{"127.0.0.1:1235", "127.0.0.1:1236"}, nodeAddrs)
require.EqualValues(t, -1, rsp.Data.Shard.MigratingSlot)
require.Nil(t, rsp.Data.Shard.MigratingSlot)
require.EqualValues(t, -1, rsp.Data.Shard.TargetShardIndex)
})

Expand Down Expand Up @@ -172,7 +174,8 @@ func TestClusterFailover(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
{Key: "shard", Value: strconv.Itoa(shardIndex)}}
{Key: "shard", Value: strconv.Itoa(shardIndex)},
}

middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
Expand Down
18 changes: 9 additions & 9 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error {
}

func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
shardIdx int, masterNodeID, preferredNodeID string) (string, error) {
shardIdx int, masterNodeID, preferredNodeID string,
) (string, error) {
shard, err := cluster.GetShard(shardIdx)
if err != nil {
return "", err
Expand Down Expand Up @@ -175,17 +176,16 @@ func (cluster *Cluster) Reset(ctx context.Context) error {
return nil
}

func (cluster *Cluster) findShardIndexBySlot(slot int) (int, error) {
if slot < 0 || slot > MaxSlotID {
return -1, consts.ErrSlotOutOfRange
}
func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) {
sourceShardIdx := -1
for i := 0; i < len(cluster.Shards); i++ {
slotRanges := cluster.Shards[i].SlotRanges
for _, slotRange := range slotRanges {
if slotRange.Contains(slot) {
if slotRange.HasOverlap(&slot) {
if sourceShardIdx != -1 {
return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards
}
sourceShardIdx = i
break
}
}
}
Expand All @@ -195,7 +195,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot int) (int, error) {
return sourceShardIdx, nil
}

func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardIdx int, slotOnly bool) error {
func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error {
if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId
}

// Will start the data migration in the background
cluster.Shards[sourceShardIdx].MigratingSlot = slot
cluster.Shards[sourceShardIdx].MigratingSlot = &slot
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions store/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Node interface {
GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
SyncClusterInfo(ctx context.Context, cluster *Cluster) error
CheckClusterMode(ctx context.Context) (int64, error)
MigrateSlot(ctx context.Context, slot int, NodeID string) error
MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error

MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error
Expand All @@ -85,9 +85,9 @@ type ClusterNode struct {
}

type ClusterInfo struct {
CurrentEpoch int64 `json:"cluster_current_epoch"`
MigratingSlot int `json:"migrating_slot"`
MigratingState string `json:"migrating_state"`
CurrentEpoch int64 `json:"cluster_current_epoch"`
MigratingSlot *SlotRange `json:"migrating_slot"`
MigratingState string `json:"migrating_state"`
}

type ClusterNodeInfo struct {
Expand Down Expand Up @@ -195,7 +195,7 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
}
case "migrating_slot", "migrating_slot(s)":
// TODO(@git-hulk): handle multiple migrating slots
clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1])
clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -257,8 +257,8 @@ func (n *ClusterNode) Reset(ctx context.Context) error {
return n.GetClient().ClusterResetHard(ctx).Err()
}

func (n *ClusterNode) MigrateSlot(ctx context.Context, slot int, targetNodeID string) error {
return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot, targetNodeID).Err()
func (n *ClusterNode) MigrateSlot(ctx context.Context, slot SlotRange, targetNodeID string) error {
return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot.String(), targetNodeID).Err()
}

func (n *ClusterNode) MarshalJSON() ([]byte, error) {
Expand Down
Loading
Loading