Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/client/command/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func printCluster(cluster *store.Cluster) {
role = strings.ToUpper(store.RoleMaster)
}
migratingStatus := "NO"
if shard.MigratingSlot != nil {
if shard.IsMigrating() {
migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex)
}
columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus}
Expand Down
13 changes: 3 additions & 10 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,19 +318,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
if !shard.IsMigrating() {
continue
}

sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
if err != nil {
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
return
}
if sourceNodeClusterInfo.MigratingSlot == nil {
log.Error("The source migration slot is empty",
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
}
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
log.Error("Mismatch migrating slot",
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot", shard.MigratingSlot.String()),
Expand All @@ -355,9 +348,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
case "success":
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot)
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange,
)
migratedSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
Expand Down
4 changes: 2 additions & 2 deletions controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode0, mockNode1, mockNode2, mockNode3,
},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
MigratingSlot: nil,
MigratingSlot: &store.MigratingSlot{IsMigrating: false},
TargetShardIndex: -1,
}},
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestCluster_MigrateSlot(t *testing.T) {
}()
slotRange, err := store.NewSlotRange(0, 0)
require.NoError(t, err)
require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false))
require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false))

s := NewMockClusterStore()
require.NoError(t, s.CreateCluster(ctx, ns, cluster))
Expand Down
2 changes: 1 addition & 1 deletion server/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type MigrateSlotRequest struct {
Target int `json:"target" validate:"required"`
Slot store.SlotRange `json:"slot" validate:"required"`
Slot store.SlotRange `json:"slot" validate:"required"` // we don't use store.MigratingSlot here because we expect a valid SlotRange
SlotOnly bool `json:"slot_only"`
}

Expand Down
4 changes: 2 additions & 2 deletions server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestClusterBasics(t *testing.T) {
slotRange, err := store.NewSlotRange(3, 3)
require.NoError(t, err)
testMigrateReq := &MigrateSlotRequest{
Slot: *slotRange,
Slot: slotRange,
SlotOnly: true,
Target: 1,
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestClusterMigrateData(t *testing.T) {
currentVersion := gotCluster.Version.Load()
sourceSlotRanges := gotCluster.Shards[0].SlotRanges
targetSlotRanges := gotCluster.Shards[1].SlotRanges
require.EqualValues(t, slotRange, *gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, slotRange, gotCluster.Shards[0].MigratingSlot.SlotRange)
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)

// Run the controller to check and update the migration status
Expand Down
4 changes: 2 additions & 2 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) {
for i := 0; i < len(cluster.Shards); i++ {
slotRanges := cluster.Shards[i].SlotRanges
for _, slotRange := range slotRanges {
if slotRange.HasOverlap(&slot) {
if slotRange.HasOverlap(slot) {
if sourceShardIdx != -1 {
return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS
}

// Will start the data migration in the background
cluster.Shards[sourceShardIdx].MigratingSlot = &slot
cluster.Shards[sourceShardIdx].MigratingSlot = FromSlotRange(slot)
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions store/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ type ClusterNode struct {
}

type ClusterInfo struct {
CurrentEpoch int64 `json:"cluster_current_epoch"`
MigratingSlot *SlotRange `json:"migrating_slot"`
MigratingState string `json:"migrating_state"`
CurrentEpoch int64 `json:"cluster_current_epoch"`
MigratingSlot *MigratingSlot `json:"migrating_slot"`
MigratingState string `json:"migrating_state"`
}

type ClusterNodeInfo struct {
Expand Down Expand Up @@ -195,10 +195,11 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
}
case "migrating_slot", "migrating_slot(s)":
// TODO(@git-hulk): handle multiple migrating slots
clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1])
slotRange, err := ParseSlotRange(fields[1])
if err != nil {
return nil, err
}
clusterInfo.MigratingSlot = FromSlotRange(*slotRange)
case "migrating_state":
clusterInfo.MigratingState = fields[1]
}
Expand Down
20 changes: 13 additions & 7 deletions store/cluster_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,17 @@ import (
"github.com/apache/kvrocks-controller/consts"
)

const (
// the old migrating slot was denoted by an int and -1 was
// used to denote a non migrating slot
NotMigratingInt = -1
)

type Shard struct {
Nodes []Node `json:"nodes"`
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
MigratingSlot *SlotRange `json:"migrating_slot"`
Nodes []Node `json:"nodes"`
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
MigratingSlot *MigratingSlot `json:"migrating_slot"`
}

type Shards []*Shard
Expand Down Expand Up @@ -112,7 +118,7 @@ func (shard *Shard) addNode(addr, role, password string) error {
}

func (shard *Shard) IsMigrating() bool {
return shard.MigratingSlot != nil && shard.TargetShardIndex != -1
return shard.MigratingSlot != nil && shard.MigratingSlot.IsMigrating && shard.TargetShardIndex != -1
}

func (shard *Shard) GetMasterNode() Node {
Expand Down Expand Up @@ -206,7 +212,7 @@ func (shard *Shard) promoteNewMaster(ctx context.Context, masterNodeID, preferre
return preferredNewMasterNode.ID(), nil
}

func (shard *Shard) HasOverlap(slotRange *SlotRange) bool {
func (shard *Shard) HasOverlap(slotRange SlotRange) bool {
for _, shardSlotRange := range shard.SlotRanges {
if shardSlotRange.HasOverlap(slotRange) {
return true
Expand Down Expand Up @@ -261,7 +267,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error {
var data struct {
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
MigratingSlot *SlotRange `json:"migrating_slot"`
MigratingSlot *MigratingSlot `json:"migrating_slot"`
Nodes []*ClusterNode `json:"nodes"`
}
if err := json.Unmarshal(bytes, &data); err != nil {
Expand Down
17 changes: 11 additions & 6 deletions store/cluster_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (

func TestShard_HasOverlap(t *testing.T) {
shard := NewShard()
slotRange := &SlotRange{Start: 0, Stop: 100}
shard.SlotRanges = append(shard.SlotRanges, *slotRange)
slotRange := SlotRange{Start: 0, Stop: 100}
shard.SlotRanges = append(shard.SlotRanges, slotRange)
require.True(t, shard.HasOverlap(slotRange))
require.True(t, shard.HasOverlap(&SlotRange{Start: 50, Stop: 150}))
require.False(t, shard.HasOverlap(&SlotRange{Start: 101, Stop: 150}))
require.True(t, shard.HasOverlap(SlotRange{Start: 50, Stop: 150}))
require.False(t, shard.HasOverlap(SlotRange{Start: 101, Stop: 150}))
}

func TestShard_Sort(t *testing.T) {
Expand All @@ -56,17 +56,22 @@ func TestShard_Sort(t *testing.T) {
func TestShard_IsServicing(t *testing.T) {
var err error
shard := NewShard()
shard.TargetShardIndex = 0
shard.MigratingSlot = &MigratingSlot{IsMigrating: false}
require.False(t, shard.IsServicing())

shard.TargetShardIndex = 0
shard.MigratingSlot = nil
require.False(t, shard.IsServicing())

shard.TargetShardIndex = 0
shard.MigratingSlot, err = NewSlotRange(1, 1)
slotRange, err := NewSlotRange(1, 1)
require.Nil(t, err)
shard.MigratingSlot = FromSlotRange(slotRange)
require.True(t, shard.IsServicing())

shard.TargetShardIndex = -1
shard.MigratingSlot = nil
shard.MigratingSlot = &MigratingSlot{IsMigrating: false}
shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
require.True(t, shard.IsServicing())

Expand Down
6 changes: 3 additions & 3 deletions store/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) {

slotRange, err := NewSlotRange(0, 0)
require.NoError(t, err)
shard, err := cluster.findShardIndexBySlot(*slotRange)
shard, err := cluster.findShardIndexBySlot(slotRange)
require.NoError(t, err)
require.Equal(t, 0, shard)

slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1)
require.NoError(t, err)
shard, err = cluster.findShardIndexBySlot(*slotRange)
shard, err = cluster.findShardIndexBySlot(slotRange)
require.NoError(t, err)
require.Equal(t, 1, shard)

slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID)
require.NoError(t, err)
shard, err = cluster.findShardIndexBySlot(*slotRange)
shard, err = cluster.findShardIndexBySlot(slotRange)
require.NoError(t, err)
require.Equal(t, 2, shard)
}
Expand Down
95 changes: 84 additions & 11 deletions store/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,26 @@ type SlotRange struct {

type SlotRanges []SlotRange

func NewSlotRange(start, stop int) (*SlotRange, error) {
type MigratingSlot struct {
SlotRange
IsMigrating bool
}

func NewSlotRange(start, stop int) (SlotRange, error) {
if start > stop {
return nil, errors.New("start was larger than stop")
return SlotRange{}, errors.New("start was larger than stop")
}
if (start < MinSlotID || start > MaxSlotID) ||
(stop < MinSlotID || stop > MaxSlotID) {
return nil, ErrSlotOutOfRange
return SlotRange{}, ErrSlotOutOfRange
}
return &SlotRange{
return SlotRange{
Start: start,
Stop: stop,
}, nil
}

func (slotRange *SlotRange) Equal(that *SlotRange) bool {
if that == nil {
return false
}
func (slotRange *SlotRange) Equal(that SlotRange) bool {
if slotRange.Start != that.Start {
return false
}
Expand All @@ -71,7 +73,7 @@ func (slotRange *SlotRange) Equal(that *SlotRange) bool {
return true
}

func (slotRange *SlotRange) HasOverlap(that *SlotRange) bool {
func (slotRange *SlotRange) HasOverlap(that SlotRange) bool {
return slotRange.Stop >= that.Start && slotRange.Start <= that.Stop
}

Expand Down Expand Up @@ -156,13 +158,84 @@ func (SlotRanges *SlotRanges) Contains(slot int) bool {

func (SlotRanges *SlotRanges) HasOverlap(slotRange SlotRange) bool {
for _, slotRange := range *SlotRanges {
if slotRange.HasOverlap(&slotRange) {
if slotRange.HasOverlap(slotRange) {
return true
}
}
return false
}

func (s *SlotRange) Reset() {
s.Start = 0
s.Stop = 0
}

// FromSlotRange will return a MigratingSlot with the IsMigrating field set to true.
// IsMigrating field would probably only be set to false from an unmarshal, like when
// reading from the topology string
func FromSlotRange(slotRange SlotRange) *MigratingSlot {
return &MigratingSlot{
SlotRange: slotRange,
IsMigrating: true,
}
}

func (s *MigratingSlot) UnmarshalJSON(data []byte) error {
var slotsString any
if err := json.Unmarshal(data, &slotsString); err != nil {
return err
}
switch t := slotsString.(type) {
case string:
slotRange := SlotRange{}
err := json.Unmarshal(data, &slotRange)
if err != nil {
s.Reset()
return err
}
s.SlotRange = slotRange
s.IsMigrating = true
case float64:
// We use integer to represent the slot because we don't support the slot range
// in the past. So we need to support the integer type for backward compatibility.
// But the number in JSON is float64, so we need to convert it to int here.
if t == NotMigratingInt {
s.Reset()
return nil
}
if t < MinSlotID || t > MaxSlotID {
s.Reset()
return ErrSlotOutOfRange
}
slotID := int(t)
s.Start = slotID
s.Stop = slotID
s.IsMigrating = true
default:
s.Reset()
return fmt.Errorf("invalid slot range type: %T", slotsString)
}
return nil
}

func (s *MigratingSlot) MarshalJSON() ([]byte, error) {
if !s.IsMigrating {
// backwards compatibility. When we read from an old cluster that had `-1`
// denoting !isMigrating. The MigratingSlot field will not be nil. So when
// this field is marshal'd back into JSON format, we can keep it as it was
// which was `-1`.
// The only case this turns back to null is if a migration happens on this
// shard, and the function `ClearMigrateState()` is called on the shard.
return json.Marshal(NotMigratingInt)
}
return json.Marshal(s.String())
}

func (s *MigratingSlot) Reset() {
s.SlotRange.Reset()
s.IsMigrating = false
}

// CanMerge will return true if the given SlotRanges are adjacent with each other
func CanMerge(a, b SlotRange) bool {
// Ensure a starts before b for easier comparison
Expand Down Expand Up @@ -218,7 +291,7 @@ func RemoveSlotFromSlotRanges(source SlotRanges, slot SlotRange) SlotRanges {
result := make([]SlotRange, 0, len(source))
for _, slotRange := range source {
// if no overlap, keep original range
if !slotRange.HasOverlap(&slot) {
if !slotRange.HasOverlap(slot) {
result = append(result, slotRange)
continue
}
Expand Down
Loading
Loading