From e2ff41a0f5c29bd88e7f1c35d659b60cc97ad40e Mon Sep 17 00:00:00 2001 From: git-hulk Date: Sat, 10 May 2025 16:21:08 +0800 Subject: [PATCH 01/15] Fix the slot range unmarshal should be compatible with integer --- store/slot.go | 23 ++++++++++++++++++----- store/slot_test.go | 20 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/store/slot.go b/store/slot.go index 04a2b52b..bc3490cb 100644 --- a/store/slot.go +++ b/store/slot.go @@ -91,15 +91,28 @@ func (slotRange *SlotRange) MarshalJSON() ([]byte, error) { } func (slotRange *SlotRange) UnmarshalJSON(data []byte) error { - var slotsString string + var slotsString any if err := json.Unmarshal(data, &slotsString); err != nil { return err } - slotObject, err := ParseSlotRange(slotsString) - if err != nil { - return err + switch t := slotsString.(type) { + case string: + slotObject, err := ParseSlotRange(t) + if err != nil { + return err + } + *slotRange = *slotObject + case float64: + // JSON numbers are float64 by default + if t < MinSlotID || t > MaxSlotID { + return ErrSlotOutOfRange + } + slotID := int(t) + slotRange.Start = slotID + slotRange.Stop = slotID + default: + return fmt.Errorf("invalid slot range type: %T", slotsString) } - *slotRange = *slotObject return nil } diff --git a/store/slot_test.go b/store/slot_test.go index 9158198f..879ba743 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -20,6 +20,7 @@ package store import ( + "encoding/json" "testing" "github.com/apache/kvrocks-controller/consts" @@ -42,6 +43,25 @@ func TestSlotRange_String(t *testing.T) { assert.Equal(t, ErrSlotOutOfRange, err) } +func TestSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { + slotBytes, err := json.Marshal(123) + require.NoError(t, err) + var slotRange SlotRange + err = json.Unmarshal(slotBytes, &slotRange) + require.NoError(t, err) + assert.Equal(t, SlotRange{Start: 123, Stop: 123}, slotRange) + + slotBytes, err = json.Marshal("456") + err = json.Unmarshal(slotBytes, &slotRange) + require.NoError(t, err) + assert.Equal(t, SlotRange{Start: 456, Stop: 456}, slotRange) + + slotBytes, err = json.Marshal("123-456") + err = json.Unmarshal(slotBytes, &slotRange) + require.NoError(t, err) + assert.Equal(t, SlotRange{Start: 123, Stop: 456}, slotRange) +} + func TestSlotRange_Parse(t *testing.T) { sr, err := ParseSlotRange("1-12") assert.Nil(t, err) From c28bfcfbd154c3652b6ff0ea6ada91797b917210 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Sat, 10 May 2025 16:25:40 +0800 Subject: [PATCH 02/15] Fix lint error --- store/slot_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/store/slot_test.go b/store/slot_test.go index 879ba743..ccc891b6 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -52,11 +52,13 @@ func TestSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { assert.Equal(t, SlotRange{Start: 123, Stop: 123}, slotRange) slotBytes, err = json.Marshal("456") + require.NoError(t, err) err = json.Unmarshal(slotBytes, &slotRange) require.NoError(t, err) assert.Equal(t, SlotRange{Start: 456, Stop: 456}, slotRange) slotBytes, err = json.Marshal("123-456") + require.NoError(t, err) err = json.Unmarshal(slotBytes, &slotRange) require.NoError(t, err) assert.Equal(t, SlotRange{Start: 123, Stop: 456}, slotRange) From 6b6441a9075ffeeb312783604b64cf6081567aa5 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Sat, 10 May 2025 16:30:59 +0800 Subject: [PATCH 03/15] Add comment lines --- store/slot.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/store/slot.go b/store/slot.go index bc3490cb..5d5e6d8b 100644 --- a/store/slot.go +++ b/store/slot.go @@ -103,7 +103,9 @@ func (slotRange *SlotRange) UnmarshalJSON(data []byte) error { } *slotRange = *slotObject case float64: - // JSON numbers are float64 by default + // We use integer to represent the slot because we don't support the slot range + // in the past. So we need to support the integer type for backward compatibility. + // But the number in JSON is float64, so we need to convert it to int here. if t < MinSlotID || t > MaxSlotID { return ErrSlotOutOfRange } From 5b759ea328646d564e5bcc2fa9e8845a134afd40 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 10 May 2025 13:35:53 -0600 Subject: [PATCH 04/15] adds MigratingSlot, still WIP --- controller/cluster.go | 13 +--- controller/cluster_test.go | 4 +- server/api/cluster.go | 2 +- server/api/cluster_test.go | 6 +- store/cluster.go | 4 +- store/cluster_node.go | 9 +-- store/cluster_shard.go | 25 +++++--- store/cluster_shard_test.go | 15 ++--- store/cluster_test.go | 6 +- store/slot.go | 115 ++++++++++++++++++++++++++---------- store/slot_test.go | 91 ++++++++++++++++++++-------- 11 files changed, 193 insertions(+), 97 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 2d6802e6..2cb08b5c 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -318,19 +318,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu if !shard.IsMigrating() { continue } - sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx) if err != nil { log.Error("Failed to get the cluster info from the source node", zap.Error(err)) return } - if sourceNodeClusterInfo.MigratingSlot == nil { - log.Error("The source migration slot is empty", - zap.String("migrating_slot", shard.MigratingSlot.String()), - ) - return - } - if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) { + if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) { log.Error("Mismatch migrating slot", zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), zap.String("migrating_slot", shard.MigratingSlot.String()), @@ -355,9 +348,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": - clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot) + clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( - clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot, + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange, ) migratedSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() diff --git a/controller/cluster_test.go b/controller/cluster_test.go index d415f3f3..247ca03b 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) { mockNode0, mockNode1, mockNode2, mockNode3, }, SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, - MigratingSlot: nil, + MigratingSlot: store.MigratingSlot{IsMigrating: false}, TargetShardIndex: -1, }}, } @@ -221,7 +221,7 @@ func TestCluster_MigrateSlot(t *testing.T) { }() slotRange, err := store.NewSlotRange(0, 0) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false)) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false)) s := NewMockClusterStore() require.NoError(t, s.CreateCluster(ctx, ns, cluster)) diff --git a/server/api/cluster.go b/server/api/cluster.go index b539c907..b2236446 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -33,7 +33,7 @@ import ( type MigrateSlotRequest struct { Target int `json:"target" validate:"required"` - Slot store.SlotRange `json:"slot" validate:"required"` + Slot store.SlotRange `json:"slot" validate:"required"` // we don't use store.MigratingSlot here because we expect a valid SlotRange SlotOnly bool `json:"slot_only"` } diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index 30d781fc..cd67b9e8 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -129,7 +129,7 @@ func TestClusterBasics(t *testing.T) { slotRange, err := store.NewSlotRange(3, 3) require.NoError(t, err) testMigrateReq := &MigrateSlotRequest{ - Slot: *slotRange, + Slot: slotRange, SlotOnly: true, Target: 1, } @@ -237,7 +237,7 @@ func TestClusterMigrateData(t *testing.T) { slotRange, err := store.NewSlotRange(10, 10) require.NoError(t, err) testMigrateReq := &MigrateSlotRequest{ - Slot: *slotRange, + Slot: slotRange, Target: 1, } body, err := json.Marshal(testMigrateReq) @@ -272,6 +272,6 @@ func TestClusterMigrateData(t *testing.T) { if err != nil { return false } - return gotCluster.Shards[0].MigratingSlot == nil + return gotCluster.Shards[0].MigratingSlot.IsMigrating }, 10*time.Second, 100*time.Millisecond) } diff --git a/store/cluster.go b/store/cluster.go index b4bfd9e9..96695d0c 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -181,7 +181,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { for i := 0; i < len(cluster.Shards); i++ { slotRanges := cluster.Shards[i].SlotRanges for _, slotRange := range slotRanges { - if slotRange.HasOverlap(&slot) { + if slotRange.HasOverlap(slot) { if sourceShardIdx != -1 { return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards } @@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS } // Will start the data migration in the background - cluster.Shards[sourceShardIdx].MigratingSlot = &slot + cluster.Shards[sourceShardIdx].MigratingSlot = MigratingSlot{SlotRange: slot, IsMigrating: true} cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx return nil } diff --git a/store/cluster_node.go b/store/cluster_node.go index e16ae83c..13e62c3c 100644 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -85,9 +85,9 @@ type ClusterNode struct { } type ClusterInfo struct { - CurrentEpoch int64 `json:"cluster_current_epoch"` - MigratingSlot *SlotRange `json:"migrating_slot"` - MigratingState string `json:"migrating_state"` + CurrentEpoch int64 `json:"cluster_current_epoch"` + MigratingSlot MigratingSlot `json:"migrating_slot"` + MigratingState string `json:"migrating_state"` } type ClusterNodeInfo struct { @@ -195,10 +195,11 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) } case "migrating_slot", "migrating_slot(s)": // TODO(@git-hulk): handle multiple migrating slots - clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1]) + slotRange, err := ParseSlotRange(fields[1]) if err != nil { return nil, err } + clusterInfo.MigratingSlot = FromSlotRange(*slotRange) case "migrating_state": clusterInfo.MigratingState = fields[1] } diff --git a/store/cluster_shard.go b/store/cluster_shard.go index 62c88269..cef5099c 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -33,11 +33,18 @@ import ( "github.com/apache/kvrocks-controller/consts" ) +const ( + NotMigratingString = "not-migrating" + // the old migrating slot was denoted by an int and -1 was + // used to denote a non migrating slot + NotMigratingInt = -1 +) + type Shard struct { - Nodes []Node `json:"nodes"` - SlotRanges []SlotRange `json:"slot_ranges"` - TargetShardIndex int `json:"target_shard_index"` - MigratingSlot *SlotRange `json:"migrating_slot"` + Nodes []Node `json:"nodes"` + SlotRanges []SlotRange `json:"slot_ranges"` + TargetShardIndex int `json:"target_shard_index"` + MigratingSlot MigratingSlot `json:"migrating_slot"` } type Shards []*Shard @@ -63,7 +70,7 @@ func NewShard() *Shard { return &Shard{ Nodes: make([]Node, 0), SlotRanges: make([]SlotRange, 0), - MigratingSlot: nil, + MigratingSlot: MigratingSlot{}, TargetShardIndex: -1, } } @@ -80,7 +87,7 @@ func (shard *Shard) Clone() *Shard { } func (shard *Shard) ClearMigrateState() { - shard.MigratingSlot = nil + shard.MigratingSlot.IsMigrating = false shard.TargetShardIndex = -1 } @@ -112,7 +119,7 @@ func (shard *Shard) addNode(addr, role, password string) error { } func (shard *Shard) IsMigrating() bool { - return shard.MigratingSlot != nil && shard.TargetShardIndex != -1 + return shard.MigratingSlot.IsMigrating && shard.TargetShardIndex != -1 } func (shard *Shard) GetMasterNode() Node { @@ -206,7 +213,7 @@ func (shard *Shard) promoteNewMaster(ctx context.Context, masterNodeID, preferre return preferredNewMasterNode.ID(), nil } -func (shard *Shard) HasOverlap(slotRange *SlotRange) bool { +func (shard *Shard) HasOverlap(slotRange SlotRange) bool { for _, shardSlotRange := range shard.SlotRanges { if shardSlotRange.HasOverlap(slotRange) { return true @@ -261,7 +268,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error { var data struct { SlotRanges []SlotRange `json:"slot_ranges"` TargetShardIndex int `json:"target_shard_index"` - MigratingSlot *SlotRange `json:"migrating_slot"` + MigratingSlot MigratingSlot `json:"migrating_slot"` Nodes []*ClusterNode `json:"nodes"` } if err := json.Unmarshal(bytes, &data); err != nil { diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index 994046f2..7052d405 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -29,11 +29,11 @@ import ( func TestShard_HasOverlap(t *testing.T) { shard := NewShard() - slotRange := &SlotRange{Start: 0, Stop: 100} - shard.SlotRanges = append(shard.SlotRanges, *slotRange) + slotRange := SlotRange{Start: 0, Stop: 100} + shard.SlotRanges = append(shard.SlotRanges, slotRange) require.True(t, shard.HasOverlap(slotRange)) - require.True(t, shard.HasOverlap(&SlotRange{Start: 50, Stop: 150})) - require.False(t, shard.HasOverlap(&SlotRange{Start: 101, Stop: 150})) + require.True(t, shard.HasOverlap(SlotRange{Start: 50, Stop: 150})) + require.False(t, shard.HasOverlap(SlotRange{Start: 101, Stop: 150})) } func TestShard_Sort(t *testing.T) { @@ -57,16 +57,17 @@ func TestShard_IsServicing(t *testing.T) { var err error shard := NewShard() shard.TargetShardIndex = 0 - shard.MigratingSlot = nil + shard.MigratingSlot = MigratingSlot{IsMigrating: false} require.False(t, shard.IsServicing()) shard.TargetShardIndex = 0 - shard.MigratingSlot, err = NewSlotRange(1, 1) + slotRange, err := NewSlotRange(1, 1) require.Nil(t, err) + shard.MigratingSlot = FromSlotRange(slotRange) require.True(t, shard.IsServicing()) shard.TargetShardIndex = -1 - shard.MigratingSlot = nil + shard.MigratingSlot = MigratingSlot{IsMigrating: false} shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} require.True(t, shard.IsServicing()) diff --git a/store/cluster_test.go b/store/cluster_test.go index 6b6e45c8..975f03f6 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -44,19 +44,19 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) { slotRange, err := NewSlotRange(0, 0) require.NoError(t, err) - shard, err := cluster.findShardIndexBySlot(*slotRange) + shard, err := cluster.findShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 0, shard) slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1) require.NoError(t, err) - shard, err = cluster.findShardIndexBySlot(*slotRange) + shard, err = cluster.findShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 1, shard) slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID) require.NoError(t, err) - shard, err = cluster.findShardIndexBySlot(*slotRange) + shard, err = cluster.findShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 2, shard) } diff --git a/store/slot.go b/store/slot.go index 5d5e6d8b..ba50b151 100644 --- a/store/slot.go +++ b/store/slot.go @@ -44,24 +44,26 @@ type SlotRange struct { type SlotRanges []SlotRange -func NewSlotRange(start, stop int) (*SlotRange, error) { +type MigratingSlot struct { + SlotRange + IsMigrating bool +} + +func NewSlotRange(start, stop int) (SlotRange, error) { if start > stop { - return nil, errors.New("start was larger than stop") + return SlotRange{}, errors.New("start was larger than stop") } if (start < MinSlotID || start > MaxSlotID) || (stop < MinSlotID || stop > MaxSlotID) { - return nil, ErrSlotOutOfRange + return SlotRange{}, ErrSlotOutOfRange } - return &SlotRange{ + return SlotRange{ Start: start, Stop: stop, }, nil } -func (slotRange *SlotRange) Equal(that *SlotRange) bool { - if that == nil { - return false - } +func (slotRange *SlotRange) Equal(that SlotRange) bool { if slotRange.Start != that.Start { return false } @@ -71,7 +73,7 @@ func (slotRange *SlotRange) Equal(that *SlotRange) bool { return true } -func (slotRange *SlotRange) HasOverlap(that *SlotRange) bool { +func (slotRange *SlotRange) HasOverlap(that SlotRange) bool { return slotRange.Stop >= that.Start && slotRange.Start <= that.Stop } @@ -91,30 +93,15 @@ func (slotRange *SlotRange) MarshalJSON() ([]byte, error) { } func (slotRange *SlotRange) UnmarshalJSON(data []byte) error { - var slotsString any + var slotsString string if err := json.Unmarshal(data, &slotsString); err != nil { return err } - switch t := slotsString.(type) { - case string: - slotObject, err := ParseSlotRange(t) - if err != nil { - return err - } - *slotRange = *slotObject - case float64: - // We use integer to represent the slot because we don't support the slot range - // in the past. So we need to support the integer type for backward compatibility. - // But the number in JSON is float64, so we need to convert it to int here. - if t < MinSlotID || t > MaxSlotID { - return ErrSlotOutOfRange - } - slotID := int(t) - slotRange.Start = slotID - slotRange.Stop = slotID - default: - return fmt.Errorf("invalid slot range type: %T", slotsString) + slotObject, err := ParseSlotRange(slotsString) + if err != nil { + return err } + *slotRange = *slotObject return nil } @@ -171,13 +158,79 @@ func (SlotRanges *SlotRanges) Contains(slot int) bool { func (SlotRanges *SlotRanges) HasOverlap(slotRange SlotRange) bool { for _, slotRange := range *SlotRanges { - if slotRange.HasOverlap(&slotRange) { + if slotRange.HasOverlap(slotRange) { return true } } return false } +func (s *SlotRange) Reset() { + s.Start = 0 + s.Stop = 0 +} + +// FromSlotRange will return a MigratingSlot with the IsMigrating set to true. +// only the cases where we unmarshal a MigratingSlot will have the potential for the +// IsMigrating field be set to false. +func FromSlotRange(slotRange SlotRange) MigratingSlot { + return MigratingSlot{ + SlotRange: slotRange, + IsMigrating: true, + } +} + +func (s *MigratingSlot) UnmarshalJSON(data []byte) error { + var slotsString any + if err := json.Unmarshal(data, &slotsString); err != nil { + return err + } + switch t := slotsString.(type) { + case string: + if strings.EqualFold(t, NotMigratingString) { + s.Reset() + return nil + } + slotRange := SlotRange{} + err := json.Unmarshal(data, &slotRange) + if err != nil { + return err + } + s.SlotRange = slotRange + s.IsMigrating = true + case float64: + if t == NotMigratingInt { + s.Reset() + return nil + } + // We use integer to represent the slot because we don't support the slot range + // in the past. So we need to support the integer type for backward compatibility. + // But the number in JSON is float64, so we need to convert it to int here. + if t < MinSlotID || t > MaxSlotID { + return ErrSlotOutOfRange + } + slotID := int(t) + s.Start = slotID + s.Stop = slotID + s.IsMigrating = true + default: + return fmt.Errorf("invalid slot range type: %T", slotsString) + } + return nil +} + +func (s *MigratingSlot) MarshalJSON() ([]byte, error) { + if s.IsMigrating { + return json.Marshal(NotMigratingString) + } + return json.Marshal(s.String()) +} + +func (s *MigratingSlot) Reset() { + s.SlotRange.Reset() + s.IsMigrating = false +} + // CanMerge will return true if the given SlotRanges are adjacent with each other func CanMerge(a, b SlotRange) bool { // Ensure a starts before b for easier comparison @@ -233,7 +286,7 @@ func RemoveSlotFromSlotRanges(source SlotRanges, slot SlotRange) SlotRanges { result := make([]SlotRange, 0, len(source)) for _, slotRange := range source { // if no overlap, keep original range - if !slotRange.HasOverlap(&slot) { + if !slotRange.HasOverlap(slot) { result = append(result, slotRange) continue } diff --git a/store/slot_test.go b/store/slot_test.go index ccc891b6..cca912b4 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -43,13 +43,54 @@ func TestSlotRange_String(t *testing.T) { assert.Equal(t, ErrSlotOutOfRange, err) } -func TestSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { - slotBytes, err := json.Marshal(123) +func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { + var migratingSlot MigratingSlot + + slotBytes, err := json.Marshal(NotMigratingInt) + require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) + + slotBytes, err = json.Marshal(-5) + require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.ErrorIs(t, err, ErrSlotOutOfRange, "-5 is not a valid 'not migrating' value") + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) + + slotBytes, err = json.Marshal("456") + require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 456, Stop: 456}, true}, migratingSlot) + + slotBytes, err = json.Marshal("123-456") require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 123, Stop: 456}, true}, migratingSlot) + + slotBytes, err = json.Marshal(NotMigratingString) + require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) +} + +func TestMigrateSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { var slotRange SlotRange + + slotBytes, err := json.Marshal("-100") + require.NoError(t, err) err = json.Unmarshal(slotBytes, &slotRange) + require.NotNil(t, err, "expects error since input is a negative number") + assert.Equal(t, SlotRange{Start: 0, Stop: 0}, slotRange) + + slotBytes, err = json.Marshal("-100-100000") require.NoError(t, err) - assert.Equal(t, SlotRange{Start: 123, Stop: 123}, slotRange) + err = json.Unmarshal(slotBytes, &slotRange) + require.NotNil(t, err, "expects error since input is out of range") + assert.Equal(t, SlotRange{Start: 0, Stop: 0}, slotRange) slotBytes, err = json.Marshal("456") require.NoError(t, err) @@ -104,31 +145,31 @@ func TestAddSlotToSlotRanges(t *testing.T) { } slotRange, err := NewSlotRange(0, 0) require.NoError(t, err) - slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + slotRanges = AddSlotToSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0], slotRanges) slotRange, err = NewSlotRange(21, 21) require.NoError(t, err) - slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + slotRanges = AddSlotToSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0], slotRanges) slotRange, err = NewSlotRange(50, 50) require.NoError(t, err) - slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + slotRanges = AddSlotToSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1], slotRanges) slotRange, err = NewSlotRange(200, 200) require.NoError(t, err) - slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + slotRanges = AddSlotToSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2], slotRanges) slotRange, err = NewSlotRange(400, 400) require.NoError(t, err) - slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + slotRanges = AddSlotToSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3], slotRanges) } @@ -141,56 +182,56 @@ func TestRemoveSlotRanges(t *testing.T) { } slotRange, err := NewSlotRange(0, 0) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0], slotRanges) slotRange, err = NewSlotRange(21, 21) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0], slotRanges) slotRange, err = NewSlotRange(20, 20) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 3, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0], slotRanges) slotRange, err = NewSlotRange(150, 150) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1], slotRanges) slotRange, err = NewSlotRange(101, 101) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1], slotRanges) slotRange, err = NewSlotRange(199, 199) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2], slotRanges) slotRange, err = NewSlotRange(300, 300) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3], slotRanges) slotRange, err = NewSlotRange(298, 298) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 5, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3], slotRanges) require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4], slotRanges) slotRange, err = NewSlotRange(299, 299) require.NoError(t, err) - slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, slotRange) require.Equal(t, 4, len(slotRanges), slotRanges) require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3], slotRanges) } @@ -209,7 +250,7 @@ func TestSlotRange_HasOverlap(t *testing.T) { Stop int } type args struct { - that *SlotRange + that SlotRange } tests := []struct { name string @@ -220,43 +261,43 @@ func TestSlotRange_HasOverlap(t *testing.T) { { name: "0-5 does not overlap 6-7", fields: fields{Start: 0, Stop: 5}, - args: args{&SlotRange{Start: 6, Stop: 7}}, + args: args{SlotRange{Start: 6, Stop: 7}}, want: false, }, { name: "0-5 does overlap 3-4", fields: fields{Start: 0, Stop: 5}, - args: args{&SlotRange{Start: 3, Stop: 4}}, + args: args{SlotRange{Start: 3, Stop: 4}}, want: true, }, { name: "0-5 does overlap 5-8", fields: fields{Start: 0, Stop: 5}, - args: args{&SlotRange{Start: 5, Stop: 8}}, + args: args{SlotRange{Start: 5, Stop: 8}}, want: true, }, { name: "0-5 does overlap 4-8", fields: fields{Start: 0, Stop: 5}, - args: args{&SlotRange{Start: 4, Stop: 8}}, + args: args{SlotRange{Start: 4, Stop: 8}}, want: true, }, { name: "0-100 does not overlap 101-150", fields: fields{Start: 0, Stop: 100}, - args: args{&SlotRange{Start: 101, Stop: 150}}, + args: args{SlotRange{Start: 101, Stop: 150}}, want: false, }, { name: "50-100 does overlap 30-50", fields: fields{Start: 50, Stop: 100}, - args: args{&SlotRange{Start: 30, Stop: 50}}, + args: args{SlotRange{Start: 30, Stop: 50}}, want: true, }, { name: "50-100 does overlap 50-51", fields: fields{Start: 50, Stop: 100}, - args: args{&SlotRange{Start: 50, Stop: 51}}, + args: args{SlotRange{Start: 50, Stop: 51}}, want: true, }, } From 580fdbbc50e8ae33d1b799ca079deddcee99ad79 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 10 May 2025 13:57:01 -0600 Subject: [PATCH 05/15] fixes some tests --- cmd/client/command/helper.go | 4 ++-- server/api/cluster_test.go | 5 ++++- server/api/shard_test.go | 5 ++++- store/slot.go | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cmd/client/command/helper.go b/cmd/client/command/helper.go index 1d395ac1..043ca89b 100644 --- a/cmd/client/command/helper.go +++ b/cmd/client/command/helper.go @@ -51,8 +51,8 @@ func printCluster(cluster *store.Cluster) { role = strings.ToUpper(store.RoleMaster) } migratingStatus := "NO" - if shard.MigratingSlot != nil { - migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex) + if shard.MigratingSlot.IsMigrating { + migratingStatus = fmt.Sprintf("%s --> %d", &shard.MigratingSlot.SlotRange, shard.TargetShardIndex) } columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus} writer.Append(columns) diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index cd67b9e8..acabaf51 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -251,7 +251,10 @@ func TestClusterMigrateData(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 1, gotCluster.Version.Load()) require.Len(t, gotCluster.Shards[0].SlotRanges, 1) - require.EqualValues(t, &store.SlotRange{Start: 10, Stop: 10}, gotCluster.Shards[0].MigratingSlot) + require.EqualValues(t, + store.MigratingSlot{SlotRange: store.SlotRange{Start: 10, Stop: 10}, IsMigrating: true}, + gotCluster.Shards[0].MigratingSlot, + ) require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex) ctrl, err := controller.New(handler.s.(*store.ClusterStore), &config.ControllerConfig{ diff --git a/server/api/shard_test.go b/server/api/shard_test.go index cdaa89b6..a89d793f 100644 --- a/server/api/shard_test.go +++ b/server/api/shard_test.go @@ -126,7 +126,10 @@ func TestShardBasics(t *testing.T) { nodeAddrs = append(nodeAddrs, node.Addr()) } require.ElementsMatch(t, []string{"127.0.0.1:1235", "127.0.0.1:1236"}, nodeAddrs) - require.Nil(t, rsp.Data.Shard.MigratingSlot) + require.EqualValues(t, + rsp.Data.Shard.MigratingSlot, + store.MigratingSlot{SlotRange: store.SlotRange{Start: 0, Stop: 0}, IsMigrating: false}, + ) require.EqualValues(t, -1, rsp.Data.Shard.TargetShardIndex) }) diff --git a/store/slot.go b/store/slot.go index ba50b151..04be335d 100644 --- a/store/slot.go +++ b/store/slot.go @@ -220,7 +220,7 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { } func (s *MigratingSlot) MarshalJSON() ([]byte, error) { - if s.IsMigrating { + if !s.IsMigrating { return json.Marshal(NotMigratingString) } return json.Marshal(s.String()) From f78b6ae48edd5922a420bdb662936292d2f4b3d2 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 10 May 2025 14:04:32 -0600 Subject: [PATCH 06/15] rewords the FromSlotRange comment --- store/slot.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/slot.go b/store/slot.go index 04be335d..3e15ed60 100644 --- a/store/slot.go +++ b/store/slot.go @@ -170,9 +170,9 @@ func (s *SlotRange) Reset() { s.Stop = 0 } -// FromSlotRange will return a MigratingSlot with the IsMigrating set to true. -// only the cases where we unmarshal a MigratingSlot will have the potential for the -// IsMigrating field be set to false. +// FromSlotRange will return a MigratingSlot with the IsMigrating field set to true. +// IsMigrating field would probably only be set to false from an unmarshal, like when +// reading from the topology string func FromSlotRange(slotRange SlotRange) MigratingSlot { return MigratingSlot{ SlotRange: slotRange, From e6772a653ba7899b2c54fff673cb906754bbe6ec Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 10 May 2025 14:09:34 -0600 Subject: [PATCH 07/15] makes use of the MigratingSlot functions --- store/cluster.go | 2 +- store/cluster_shard.go | 2 +- store/slot.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/store/cluster.go b/store/cluster.go index 96695d0c..8dff6ed7 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS } // Will start the data migration in the background - cluster.Shards[sourceShardIdx].MigratingSlot = MigratingSlot{SlotRange: slot, IsMigrating: true} + cluster.Shards[sourceShardIdx].MigratingSlot = FromSlotRange(slot) cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx return nil } diff --git a/store/cluster_shard.go b/store/cluster_shard.go index cef5099c..f78eb893 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -87,7 +87,7 @@ func (shard *Shard) Clone() *Shard { } func (shard *Shard) ClearMigrateState() { - shard.MigratingSlot.IsMigrating = false + shard.MigratingSlot.Reset() shard.TargetShardIndex = -1 } diff --git a/store/slot.go b/store/slot.go index 3e15ed60..4a817ef2 100644 --- a/store/slot.go +++ b/store/slot.go @@ -199,13 +199,13 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { s.SlotRange = slotRange s.IsMigrating = true case float64: + // We use integer to represent the slot because we don't support the slot range + // in the past. So we need to support the integer type for backward compatibility. + // But the number in JSON is float64, so we need to convert it to int here. if t == NotMigratingInt { s.Reset() return nil } - // We use integer to represent the slot because we don't support the slot range - // in the past. So we need to support the integer type for backward compatibility. - // But the number in JSON is float64, so we need to convert it to int here. if t < MinSlotID || t > MaxSlotID { return ErrSlotOutOfRange } From e40cd579fc19f504690f6733c522659bd6d50ea9 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 10 May 2025 14:11:57 -0600 Subject: [PATCH 08/15] adds clarity to a test --- store/slot_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/slot_test.go b/store/slot_test.go index cca912b4..751dab99 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -49,7 +49,7 @@ func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { slotBytes, err := json.Marshal(NotMigratingInt) require.NoError(t, err) err = json.Unmarshal(slotBytes, &migratingSlot) - require.NoError(t, err) + require.NoError(t, err, "expects no error since -1 was a valid 'not migrating' value") assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) slotBytes, err = json.Marshal(-5) From 32998f13e7edb3d2126b78a2a6b2c5fc77f3b12b Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sun, 11 May 2025 02:31:44 -0600 Subject: [PATCH 09/15] adds a case for an invalid string --- store/slot_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/store/slot_test.go b/store/slot_test.go index 751dab99..3c84b59d 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -75,6 +75,12 @@ func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { err = json.Unmarshal(slotBytes, &migratingSlot) require.NoError(t, err) assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) + + slotBytes, err = json.Marshal("invalid-string") + require.NoError(t, err) + err = json.Unmarshal(slotBytes, &migratingSlot) + require.Error(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) } func TestMigrateSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { From a085743a8893acaee67767102f1c7353e5f438e2 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sun, 11 May 2025 02:55:19 -0600 Subject: [PATCH 10/15] remove old migratingSlot nil check to a false check instead --- server/api/cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index 1f55c06c..52e763a4 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -287,7 +287,7 @@ func TestClusterMigrateData(t *testing.T) { gotCluster, err = clusterStore.GetCluster(ctx, ns, clusterName) require.NoError(t, err) require.EqualValues(t, currentVersion+1, gotCluster.Version.Load()) - require.Nil(t, gotCluster.Shards[0].MigratingSlot) + require.False(t, gotCluster.Shards[0].MigratingSlot.IsMigrating) require.EqualValues(t, -1, gotCluster.Shards[0].TargetShardIndex) require.EqualValues(t, store.RemoveSlotFromSlotRanges(sourceSlotRanges, slotRange), gotCluster.Shards[0].SlotRanges) require.EqualValues(t, store.AddSlotToSlotRanges(targetSlotRanges, slotRange), gotCluster.Shards[1].SlotRanges) From 4a6ca37609d35f2ec30c9eb9936e63d0996346a3 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 12 May 2025 10:59:24 -0600 Subject: [PATCH 11/15] addresses git-hulk comment on making MigratinSlot be null if not migrating --- controller/cluster_test.go | 2 +- server/api/cluster_test.go | 4 ++-- server/api/shard_test.go | 5 +---- store/cluster_node.go | 6 +++--- store/cluster_shard.go | 17 ++++++++--------- store/cluster_shard_test.go | 8 ++++++-- store/slot.go | 12 +++--------- store/slot_test.go | 6 ------ 8 files changed, 24 insertions(+), 36 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 247ca03b..a1a720a6 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) { mockNode0, mockNode1, mockNode2, mockNode3, }, SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, - MigratingSlot: store.MigratingSlot{IsMigrating: false}, + MigratingSlot: &store.MigratingSlot{IsMigrating: false}, TargetShardIndex: -1, }}, } diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index 52e763a4..5d3aa8d6 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -279,7 +279,7 @@ func TestClusterMigrateData(t *testing.T) { require.Eventually(t, func() bool { gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster") require.NoError(t, err) - return !gotCluster.Shards[0].MigratingSlot.IsMigrating + return gotCluster.Shards[0].MigratingSlot == nil }, 10*time.Second, 100*time.Millisecond) controller.Close() @@ -287,7 +287,7 @@ func TestClusterMigrateData(t *testing.T) { gotCluster, err = clusterStore.GetCluster(ctx, ns, clusterName) require.NoError(t, err) require.EqualValues(t, currentVersion+1, gotCluster.Version.Load()) - require.False(t, gotCluster.Shards[0].MigratingSlot.IsMigrating) + require.Nil(t, gotCluster.Shards[0].MigratingSlot) require.EqualValues(t, -1, gotCluster.Shards[0].TargetShardIndex) require.EqualValues(t, store.RemoveSlotFromSlotRanges(sourceSlotRanges, slotRange), gotCluster.Shards[0].SlotRanges) require.EqualValues(t, store.AddSlotToSlotRanges(targetSlotRanges, slotRange), gotCluster.Shards[1].SlotRanges) diff --git a/server/api/shard_test.go b/server/api/shard_test.go index 8af88bf0..967400f6 100644 --- a/server/api/shard_test.go +++ b/server/api/shard_test.go @@ -126,10 +126,7 @@ func TestShardBasics(t *testing.T) { nodeAddrs = append(nodeAddrs, node.Addr()) } require.ElementsMatch(t, []string{"127.0.0.1:1235", "127.0.0.1:1236"}, nodeAddrs) - require.EqualValues(t, - rsp.Data.Shard.MigratingSlot, - store.MigratingSlot{SlotRange: store.SlotRange{Start: 0, Stop: 0}, IsMigrating: false}, - ) + require.Nil(t, rsp.Data.Shard.MigratingSlot) require.EqualValues(t, -1, rsp.Data.Shard.TargetShardIndex) }) diff --git a/store/cluster_node.go b/store/cluster_node.go index 91fc4c9d..8d038d0d 100644 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -85,9 +85,9 @@ type ClusterNode struct { } type ClusterInfo struct { - CurrentEpoch int64 `json:"cluster_current_epoch"` - MigratingSlot MigratingSlot `json:"migrating_slot"` - MigratingState string `json:"migrating_state"` + CurrentEpoch int64 `json:"cluster_current_epoch"` + MigratingSlot *MigratingSlot `json:"migrating_slot"` + MigratingState string `json:"migrating_state"` } type ClusterNodeInfo struct { diff --git a/store/cluster_shard.go b/store/cluster_shard.go index f78eb893..69ffd4a5 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -34,17 +34,16 @@ import ( ) const ( - NotMigratingString = "not-migrating" // the old migrating slot was denoted by an int and -1 was // used to denote a non migrating slot NotMigratingInt = -1 ) type Shard struct { - Nodes []Node `json:"nodes"` - SlotRanges []SlotRange `json:"slot_ranges"` - TargetShardIndex int `json:"target_shard_index"` - MigratingSlot MigratingSlot `json:"migrating_slot"` + Nodes []Node `json:"nodes"` + SlotRanges []SlotRange `json:"slot_ranges"` + TargetShardIndex int `json:"target_shard_index"` + MigratingSlot *MigratingSlot `json:"migrating_slot"` } type Shards []*Shard @@ -70,7 +69,7 @@ func NewShard() *Shard { return &Shard{ Nodes: make([]Node, 0), SlotRanges: make([]SlotRange, 0), - MigratingSlot: MigratingSlot{}, + MigratingSlot: nil, TargetShardIndex: -1, } } @@ -87,7 +86,7 @@ func (shard *Shard) Clone() *Shard { } func (shard *Shard) ClearMigrateState() { - shard.MigratingSlot.Reset() + shard.MigratingSlot = nil shard.TargetShardIndex = -1 } @@ -119,7 +118,7 @@ func (shard *Shard) addNode(addr, role, password string) error { } func (shard *Shard) IsMigrating() bool { - return shard.MigratingSlot.IsMigrating && shard.TargetShardIndex != -1 + return shard.MigratingSlot != nil && shard.MigratingSlot.IsMigrating && shard.TargetShardIndex != -1 } func (shard *Shard) GetMasterNode() Node { @@ -268,7 +267,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error { var data struct { SlotRanges []SlotRange `json:"slot_ranges"` TargetShardIndex int `json:"target_shard_index"` - MigratingSlot MigratingSlot `json:"migrating_slot"` + MigratingSlot *MigratingSlot `json:"migrating_slot"` Nodes []*ClusterNode `json:"nodes"` } if err := json.Unmarshal(bytes, &data); err != nil { diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index 7052d405..1406f351 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -57,7 +57,11 @@ func TestShard_IsServicing(t *testing.T) { var err error shard := NewShard() shard.TargetShardIndex = 0 - shard.MigratingSlot = MigratingSlot{IsMigrating: false} + shard.MigratingSlot = &MigratingSlot{IsMigrating: false} + require.False(t, shard.IsServicing()) + + shard.TargetShardIndex = 0 + shard.MigratingSlot = nil require.False(t, shard.IsServicing()) shard.TargetShardIndex = 0 @@ -67,7 +71,7 @@ func TestShard_IsServicing(t *testing.T) { require.True(t, shard.IsServicing()) shard.TargetShardIndex = -1 - shard.MigratingSlot = MigratingSlot{IsMigrating: false} + shard.MigratingSlot = &MigratingSlot{IsMigrating: false} shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} require.True(t, shard.IsServicing()) diff --git a/store/slot.go b/store/slot.go index 4a817ef2..7c753839 100644 --- a/store/slot.go +++ b/store/slot.go @@ -173,8 +173,8 @@ func (s *SlotRange) Reset() { // FromSlotRange will return a MigratingSlot with the IsMigrating field set to true. // IsMigrating field would probably only be set to false from an unmarshal, like when // reading from the topology string -func FromSlotRange(slotRange SlotRange) MigratingSlot { - return MigratingSlot{ +func FromSlotRange(slotRange SlotRange) *MigratingSlot { + return &MigratingSlot{ SlotRange: slotRange, IsMigrating: true, } @@ -187,13 +187,10 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { } switch t := slotsString.(type) { case string: - if strings.EqualFold(t, NotMigratingString) { - s.Reset() - return nil - } slotRange := SlotRange{} err := json.Unmarshal(data, &slotRange) if err != nil { + s.Reset() return err } s.SlotRange = slotRange @@ -220,9 +217,6 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { } func (s *MigratingSlot) MarshalJSON() ([]byte, error) { - if !s.IsMigrating { - return json.Marshal(NotMigratingString) - } return json.Marshal(s.String()) } diff --git a/store/slot_test.go b/store/slot_test.go index 3c84b59d..83c56af1 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -70,12 +70,6 @@ func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { require.NoError(t, err) assert.Equal(t, MigratingSlot{SlotRange{Start: 123, Stop: 456}, true}, migratingSlot) - slotBytes, err = json.Marshal(NotMigratingString) - require.NoError(t, err) - err = json.Unmarshal(slotBytes, &migratingSlot) - require.NoError(t, err) - assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) - slotBytes, err = json.Marshal("invalid-string") require.NoError(t, err) err = json.Unmarshal(slotBytes, &migratingSlot) From c2f6f5eec3adf30936c7037844885d70d42cfe60 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 12 May 2025 13:18:42 -0600 Subject: [PATCH 12/15] fixes scenario where -1 turns into 0 --- store/slot.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/store/slot.go b/store/slot.go index 7c753839..097aa43a 100644 --- a/store/slot.go +++ b/store/slot.go @@ -217,6 +217,15 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { } func (s *MigratingSlot) MarshalJSON() ([]byte, error) { + if !s.IsMigrating { + // backwards compatibility. When we read from an old cluster that had `-1` + // denoting !isMigrating. The MigratingSlot field will not be nil. So when + // this field is marshal'd back into JSON format, we can keep it as it was + // which was `-1`. + // The only case this turns back to null is if a migration happens on this + // shard, and the function `ClearMigrateState()` is called on the shard. + return json.Marshal(NotMigratingInt) + } return json.Marshal(s.String()) } From b4a0833f66030dee739091a4576c4e93ee7a0e15 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 12 May 2025 14:36:25 -0600 Subject: [PATCH 13/15] adds more tests --- store/slot.go | 2 ++ store/slot_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/store/slot.go b/store/slot.go index 097aa43a..448002eb 100644 --- a/store/slot.go +++ b/store/slot.go @@ -204,6 +204,7 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { return nil } if t < MinSlotID || t > MaxSlotID { + s.Reset() return ErrSlotOutOfRange } slotID := int(t) @@ -211,6 +212,7 @@ func (s *MigratingSlot) UnmarshalJSON(data []byte) error { s.Stop = slotID s.IsMigrating = true default: + s.Reset() return fmt.Errorf("invalid slot range type: %T", slotsString) } return nil diff --git a/store/slot_test.go b/store/slot_test.go index 83c56af1..119ab097 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -43,15 +43,17 @@ func TestSlotRange_String(t *testing.T) { assert.Equal(t, ErrSlotOutOfRange, err) } -func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { +func TestMigratingSlot_UnmarshalJSON(t *testing.T) { var migratingSlot MigratingSlot + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 5}, IsMigrating: true} // to set values to migratingSlot slotBytes, err := json.Marshal(NotMigratingInt) require.NoError(t, err) err = json.Unmarshal(slotBytes, &migratingSlot) require.NoError(t, err, "expects no error since -1 was a valid 'not migrating' value") assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 5}, IsMigrating: true} // to set values to migratingSlot slotBytes, err = json.Marshal(-5) require.NoError(t, err) err = json.Unmarshal(slotBytes, &migratingSlot) @@ -77,6 +79,52 @@ func TestMigratingSlot_MarshalAndUnmarshalJSON(t *testing.T) { assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) } +// TestMigratingSlot_MarshalUnmarshalJSON will check that we can marshal and then unmarshal +// back into the MigratingSlot +func TestMigratingSlot_MarshalUnmarshalJSON(t *testing.T) { + migratingSlot := MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 5}, IsMigrating: true} + migratingSlotBytes, err := json.Marshal(&migratingSlot) + require.NoError(t, err) + err = json.Unmarshal(migratingSlotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 5, Stop: 5}, true}, migratingSlot) + + // tests that we can marshal isMigrating = false, which results in -1, and then unmarshal it + // to be isMigrating = false again + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 0, Stop: 0}, IsMigrating: false} + migratingSlotBytes, err = json.Marshal(&migratingSlot) + require.NoError(t, err) + err = json.Unmarshal(migratingSlotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot) + + // same test as earlier, but checks that it resets the start and stop + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 5}, IsMigrating: false} + migratingSlotBytes, err = json.Marshal(&migratingSlot) + require.NoError(t, err) + err = json.Unmarshal(migratingSlotBytes, &migratingSlot) + require.NoError(t, err) + assert.Equal(t, MigratingSlot{SlotRange{Start: 0, Stop: 0}, false}, migratingSlot, "expects start and stop to reset to 0") +} + +// TestMigratingSlot_MarshalJSON will checks the resulting string +func TestMigratingSlot_MarshalJSON(t *testing.T) { + migratingSlot := MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 5}, IsMigrating: true} + migratingSlotBytes, err := json.Marshal(&migratingSlot) + require.NoError(t, err) + assert.Equal(t, `"5"`, string(migratingSlotBytes)) + + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 10}, IsMigrating: true} + migratingSlotBytes, err = json.Marshal(&migratingSlot) + require.NoError(t, err) + assert.Equal(t, `"5-10"`, string(migratingSlotBytes)) + + migratingSlot = MigratingSlot{SlotRange: SlotRange{Start: 5, Stop: 10}, IsMigrating: false} + migratingSlotBytes, err = json.Marshal(&migratingSlot) + require.NoError(t, err) + assert.Equal(t, `-1`, string(migratingSlotBytes)) +} + func TestMigrateSlotRange_MarshalAndUnmarshalJSON(t *testing.T) { var slotRange SlotRange From 57ee490b7cfb35effbe6d8bcd286242fa05707c4 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 12 May 2025 14:49:13 -0600 Subject: [PATCH 14/15] removes the & --- cmd/client/command/helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/client/command/helper.go b/cmd/client/command/helper.go index 043ca89b..aa579a1a 100644 --- a/cmd/client/command/helper.go +++ b/cmd/client/command/helper.go @@ -52,7 +52,7 @@ func printCluster(cluster *store.Cluster) { } migratingStatus := "NO" if shard.MigratingSlot.IsMigrating { - migratingStatus = fmt.Sprintf("%s --> %d", &shard.MigratingSlot.SlotRange, shard.TargetShardIndex) + migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex) } columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus} writer.Append(columns) From 0dda5ab90344212420d6d6942531a13c729ab982 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 12 May 2025 15:20:13 -0600 Subject: [PATCH 15/15] helper to use shard.IsMigrating function --- cmd/client/command/helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/client/command/helper.go b/cmd/client/command/helper.go index aa579a1a..ade4f35c 100644 --- a/cmd/client/command/helper.go +++ b/cmd/client/command/helper.go @@ -51,7 +51,7 @@ func printCluster(cluster *store.Cluster) { role = strings.ToUpper(store.RoleMaster) } migratingStatus := "NO" - if shard.MigratingSlot.IsMigrating { + if shard.IsMigrating() { migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex) } columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus}